Skip to content

Commit ca85e3a

Browse files
author
Aaron Lehmann
committed
raft: Use TransferLeadership to make leader demotion safer
When we demote the leader, we currently wait for all queued messages to be sent, as a best-effort approach to making sure the other nodes find out that the node removal has been committed, and stop treating the current leader as a cluster member. This doesn't work perfectly. To make this more robust, use TransferLeadership when the leader is trying to remove itself. The new leader's reconcilation loop will kick in and remove the old leader. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
1 parent ce8e78a commit ca85e3a

File tree

5 files changed

+95
-110
lines changed

5 files changed

+95
-110
lines changed

manager/controlapi/node_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func TestUpdateNode(t *testing.T) {
532532
assert.Error(t, err)
533533
}
534534

535-
func testUpdateNodeDemote(leader bool, t *testing.T) {
535+
func testUpdateNodeDemote(t *testing.T) {
536536
tc := cautils.NewTestCA(nil)
537537
defer tc.Stop()
538538
ts := newTestServer(t)
@@ -654,14 +654,8 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {
654654
return nil
655655
}))
656656

657-
var demoteNode, lastNode *raftutils.TestNode
658-
if leader {
659-
demoteNode = nodes[1]
660-
lastNode = nodes[2]
661-
} else {
662-
demoteNode = nodes[2]
663-
lastNode = nodes[1]
664-
}
657+
demoteNode := nodes[2]
658+
lastNode := nodes[1]
665659

666660
raftMember = ts.Server.raft.GetMemberByNodeID(demoteNode.SecurityConfig.ClientTLSCreds.NodeID())
667661
assert.NotNil(t, raftMember)
@@ -734,10 +728,5 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {
734728

735729
func TestUpdateNodeDemote(t *testing.T) {
736730
t.Parallel()
737-
testUpdateNodeDemote(false, t)
738-
}
739-
740-
func TestUpdateNodeDemoteLeader(t *testing.T) {
741-
t.Parallel()
742-
testUpdateNodeDemote(true, t)
731+
testUpdateNodeDemote(t)
743732
}

manager/role_manager.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,15 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
137137
defer rmCancel()
138138

139139
if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
140-
// TODO(aaronl): Retry later
141-
log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID)
140+
if err == raft.ErrCannotRemoveSelf {
141+
// Don't use rmCtx, because we expect to lose
142+
// leadership, which will cancel this context.
143+
log.L.Info("demoted; transferring leadership")
144+
rm.raft.TransferLeadership(context.Background())
145+
} else {
146+
// TODO(aaronl): Retry later
147+
log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID)
148+
}
142149
return
143150
}
144151
}

manager/state/raft/raft.go

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ var (
5757
// ErrMemberUnknown is sent in response to a message from an
5858
// unrecognized peer.
5959
ErrMemberUnknown = errors.New("raft: member unknown")
60+
// ErrCannotRemoveSelf is returned if RemoveMember is called with the
61+
// local node as the argument.
62+
ErrCannotRemoveSelf = errors.New("raft: can't remove self")
6063
)
6164

6265
// LeadershipState indicates whether the node is a leader or follower.
@@ -412,7 +415,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
412415
defer conn.Close()
413416
client := api.NewRaftMembershipClient(conn)
414417

415-
joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
418+
joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
416419
defer joinCancel()
417420
resp, err := client.Join(joinCtx, &api.JoinRequest{
418421
Addr: n.opts.Addr,
@@ -1030,6 +1033,10 @@ func (n *Node) UpdateNode(id uint64, addr string) {
10301033
// from a member who is willing to leave its raft
10311034
// membership to an active member of the raft
10321035
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
1036+
if req.Node == nil {
1037+
return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
1038+
}
1039+
10331040
nodeInfo, err := ca.RemoteNode(ctx)
10341041
if err != nil {
10351042
return nil, err
@@ -1086,6 +1093,10 @@ func (n *Node) CanRemoveMember(id uint64) bool {
10861093
}
10871094

10881095
func (n *Node) removeMember(ctx context.Context, id uint64) error {
1096+
if id == n.Config.ID {
1097+
return ErrCannotRemoveSelf
1098+
}
1099+
10891100
// can't stop the raft node while an async RPC is in progress
10901101
n.stopMu.RLock()
10911102
defer n.stopMu.RUnlock()
@@ -1100,18 +1111,58 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {
11001111

11011112
n.membershipLock.Lock()
11021113
defer n.membershipLock.Unlock()
1103-
if n.CanRemoveMember(id) {
1104-
cc := raftpb.ConfChange{
1105-
ID: id,
1106-
Type: raftpb.ConfChangeRemoveNode,
1107-
NodeID: id,
1108-
Context: []byte(""),
1109-
}
1110-
err := n.configure(ctx, cc)
1111-
return err
1114+
if !n.CanRemoveMember(id) {
1115+
return ErrCannotRemoveMember
11121116
}
11131117

1114-
return ErrCannotRemoveMember
1118+
cc := raftpb.ConfChange{
1119+
ID: id,
1120+
Type: raftpb.ConfChangeRemoveNode,
1121+
NodeID: id,
1122+
Context: []byte(""),
1123+
}
1124+
return n.configure(ctx, cc)
1125+
}
1126+
1127+
// TransferLeadership attempts to transfer leadership to a different node,
1128+
// and wait for the transfer to happen.
1129+
func (n *Node) TransferLeadership(ctx context.Context) error {
1130+
ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
1131+
defer cancelTransfer()
1132+
1133+
n.stopMu.RLock()
1134+
defer n.stopMu.RUnlock()
1135+
1136+
if !n.IsMember() {
1137+
return ErrNoRaftMember
1138+
}
1139+
1140+
if !n.isLeader() {
1141+
return ErrLostLeadership
1142+
}
1143+
1144+
transferee, err := n.transport.LongestActive()
1145+
if err != nil {
1146+
return errors.Wrap(err, "failed to get longest-active member")
1147+
}
1148+
start := time.Now()
1149+
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
1150+
ticker := time.NewTicker(n.opts.TickInterval / 10)
1151+
defer ticker.Stop()
1152+
var leader uint64
1153+
for {
1154+
leader = n.leader()
1155+
if leader != raft.None && leader != n.Config.ID {
1156+
break
1157+
}
1158+
select {
1159+
case <-ctx.Done():
1160+
return ctx.Err()
1161+
case <-ticker.C:
1162+
}
1163+
}
1164+
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
1165+
return nil
11151166
}
11161167

11171168
// RemoveMember submits a configuration change to remove a member from the raft cluster
@@ -1726,23 +1777,12 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
17261777
}
17271778

17281779
if cc.NodeID == n.Config.ID {
1729-
// wait the commit ack to be sent before closing connection
1780+
// wait for the commit ack to be sent before closing connection
17301781
n.asyncTasks.Wait()
17311782

17321783
n.NodeRemoved()
1733-
// if there are only 2 nodes in the cluster, and leader is leaving
1734-
// before closing the connection, leader has to ensure that follower gets
1735-
// noticed about this raft conf change commit. Otherwise, follower would
1736-
// assume there are still 2 nodes in the cluster and won't get elected
1737-
// into the leader by acquiring the majority (2 nodes)
1738-
1739-
// while n.asyncTasks.Wait() could be helpful in this case
1740-
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
1741-
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
1742-
} else {
1743-
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
1744-
return err
1745-
}
1784+
} else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
1785+
return err
17461786
}
17471787

17481788
return n.cluster.RemoveMember(cc.NodeID)
@@ -1852,3 +1892,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
18521892
}
18531893
return sids
18541894
}
1895+
1896+
func (n *Node) reqTimeout() time.Duration {
1897+
return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
1898+
}

manager/state/raft/raft_test.go

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -327,74 +327,6 @@ func TestRaftFollowerLeave(t *testing.T) {
327327
assert.Len(t, nodes[4].GetMemberlist(), 4)
328328
}
329329

330-
func TestRaftLeaderLeave(t *testing.T) {
331-
t.Parallel()
332-
333-
nodes, clockSource := raftutils.NewRaftCluster(t, tc)
334-
defer raftutils.TeardownCluster(t, nodes)
335-
336-
// node 1 is the leader
337-
assert.Equal(t, nodes[1].Leader(), nodes[1].Config.ID)
338-
339-
// Try to leave the raft
340-
// Use gRPC instead of calling handler directly because of
341-
// authorization check.
342-
cc, err := dial(nodes[1], nodes[1].Address)
343-
assert.NoError(t, err)
344-
raftClient := api.NewRaftMembershipClient(cc)
345-
defer cc.Close()
346-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
347-
defer cancel()
348-
resp, err := raftClient.Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: nodes[1].Config.ID}})
349-
assert.NoError(t, err, "error sending message to leave the raft")
350-
assert.NotNil(t, resp, "leave response message is nil")
351-
352-
newCluster := map[uint64]*raftutils.TestNode{
353-
2: nodes[2],
354-
3: nodes[3],
355-
}
356-
// Wait for election tick
357-
raftutils.WaitForCluster(t, clockSource, newCluster)
358-
359-
// Leader should not be 1
360-
assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID)
361-
assert.Equal(t, nodes[2].Leader(), nodes[3].Leader())
362-
363-
leader := nodes[2].Leader()
364-
365-
// Find the leader node and a follower node
366-
var (
367-
leaderNode *raftutils.TestNode
368-
followerNode *raftutils.TestNode
369-
)
370-
for i, n := range nodes {
371-
if n.Config.ID == leader {
372-
leaderNode = n
373-
if i == 2 {
374-
followerNode = nodes[3]
375-
} else {
376-
followerNode = nodes[2]
377-
}
378-
}
379-
}
380-
381-
require.NotNil(t, leaderNode)
382-
require.NotNil(t, followerNode)
383-
384-
// Propose a value
385-
value, err := raftutils.ProposeValue(t, leaderNode, DefaultProposalTime)
386-
assert.NoError(t, err, "failed to propose value")
387-
388-
// The value should be replicated on all remaining nodes
389-
raftutils.CheckValue(t, clockSource, leaderNode, value)
390-
assert.Len(t, leaderNode.GetMemberlist(), 2)
391-
392-
raftutils.CheckValue(t, clockSource, followerNode, value)
393-
assert.Len(t, followerNode.GetMemberlist(), 2)
394-
395-
raftutils.TeardownCluster(t, newCluster)
396-
}
397-
398330
func TestRaftNewNodeGetsData(t *testing.T) {
399331
t.Parallel()
400332

manager/state/raft/transport/transport.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool {
295295
return active
296296
}
297297

298+
// LongestActive returns the ID of the peer that has been active for the longest
299+
// length of time.
300+
func (t *Transport) LongestActive() (uint64, error) {
301+
p, err := t.longestActive()
302+
if err != nil {
303+
return 0, err
304+
}
305+
306+
return p.id, nil
307+
}
308+
309+
// longestActive returns the peer that has been active for the longest length of
310+
// time.
298311
func (t *Transport) longestActive() (*peer, error) {
299312
var longest *peer
300313
var longestTime time.Time

0 commit comments

Comments
 (0)