Skip to content

Commit f7b7644

Browse files
author
Aaron Lehmann
committed
raft: Use TransferLeadership to make leader demotion safer
When we demote the leader, we currently wait for all queued messages to be sent, as a best-effort approach to making sure the other nodes find out that the node removal has been committed, and stop treating the current leader as a cluster member. This doesn't work perfectly. To make this more robust, use TransferLeadership when the leader is trying to remove itself, then call the Leave RPC on the new leader once the transfer has completed. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
1 parent ce8e78a commit f7b7644

File tree

2 files changed

+76
-9
lines changed

2 files changed

+76
-9
lines changed

manager/state/raft/raft.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,10 @@ func (n *Node) UpdateNode(id uint64, addr string) {
10301030
// from a member who is willing to leave its raft
10311031
// membership to an active member of the raft
10321032
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
1033+
if req.Node == nil {
1034+
return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
1035+
}
1036+
10331037
nodeInfo, err := ca.RemoteNode(ctx)
10341038
if err != nil {
10351039
return nil, err
@@ -1100,18 +1104,69 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {
11001104

11011105
n.membershipLock.Lock()
11021106
defer n.membershipLock.Unlock()
1103-
if n.CanRemoveMember(id) {
1104-
cc := raftpb.ConfChange{
1105-
ID: id,
1106-
Type: raftpb.ConfChangeRemoveNode,
1107-
NodeID: id,
1108-
Context: []byte(""),
1107+
if !n.CanRemoveMember(id) {
1108+
return ErrCannotRemoveMember
1109+
}
1110+
1111+
if id == n.Config.ID {
1112+
// To avoid a corner case with self-removal, transfer leadership
1113+
// to another node and ask that node to remove us.
1114+
if err := n.removeSelfGracefully(ctx); err != nil {
1115+
log.G(ctx).WithError(err).Error("failed to leave raft cluster gracefully")
1116+
} else {
1117+
return nil
11091118
}
1110-
err := n.configure(ctx, cc)
1119+
}
1120+
1121+
cc := raftpb.ConfChange{
1122+
ID: id,
1123+
Type: raftpb.ConfChangeRemoveNode,
1124+
NodeID: id,
1125+
Context: []byte(""),
1126+
}
1127+
return n.configure(ctx, cc)
1128+
}
1129+
1130+
func (n *Node) removeSelfGracefully(ctx context.Context) error {
1131+
transferCtx, cancelTransfer := context.WithTimeout(ctx, 10*time.Second)
1132+
defer cancelTransfer()
1133+
1134+
if err := n.transferLeadership(transferCtx); err != nil {
1135+
return errors.Wrap(err, "failed to transfer leadership")
1136+
}
1137+
conn, err := n.transport.PeerConn(n.leader())
1138+
if err != nil {
11111139
return err
11121140
}
1141+
_, err = api.NewRaftMembershipClient(conn).Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: n.Config.ID}})
1142+
return err
1143+
}
11131144

1114-
return ErrCannotRemoveMember
1145+
// transferLeadership attempts to transfer leadership to a different node,
1146+
// and wait for the transfer to happen. It must be called with stopMu held.
1147+
func (n *Node) transferLeadership(ctx context.Context) error {
1148+
transferee, err := n.transport.LongestActive()
1149+
if err != nil {
1150+
return errors.Wrap(err, "failed to get longest-active member")
1151+
}
1152+
start := time.Now()
1153+
log.G(ctx).Infof("raft: transfer leadership %x -> %x", n.Config.ID, transferee)
1154+
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
1155+
ticker := time.NewTicker(100 * time.Millisecond)
1156+
defer ticker.Stop()
1157+
for {
1158+
leader := n.leader()
1159+
if leader != 0 && leader != n.Config.ID {
1160+
break
1161+
}
1162+
select {
1163+
case <-ctx.Done():
1164+
return errors.Wrap(err, "timed out waiting for leadership change")
1165+
case <-ticker.C:
1166+
}
1167+
}
1168+
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, transferee, time.Since(start))
1169+
return nil
11151170
}
11161171

11171172
// RemoveMember submits a configuration change to remove a member from the raft cluster
@@ -1738,7 +1793,6 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
17381793

17391794
// while n.asyncTasks.Wait() could be helpful in this case
17401795
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
1741-
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
17421796
} else {
17431797
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
17441798
return err

manager/state/raft/transport/transport.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool {
295295
return active
296296
}
297297

298+
// LongestActive returns the ID of the peer that has been active for the longest
299+
// length of time.
300+
func (t *Transport) LongestActive() (uint64, error) {
301+
p, err := t.longestActive()
302+
if err != nil {
303+
return 0, err
304+
}
305+
306+
return p.id, nil
307+
}
308+
309+
// longestActive returns the peer that has been active for the longest length of
310+
// time.
298311
func (t *Transport) longestActive() (*peer, error) {
299312
var longest *peer
300313
var longestTime time.Time

0 commit comments

Comments
 (0)