Skip to content

Commit a82deb6

Browse files
authored
Merge pull request #1939 from aaronlehmann/transferleadership
raft: Use TransferLeadership to make leader demotion safer
2 parents ffcdb88 + 5470e07 commit a82deb6

File tree

4 files changed

+90
-41
lines changed

4 files changed

+90
-41
lines changed

manager/controlapi/node_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func TestUpdateNode(t *testing.T) {
532532
assert.Error(t, err)
533533
}
534534

535-
func testUpdateNodeDemote(leader bool, t *testing.T) {
535+
func testUpdateNodeDemote(t *testing.T) {
536536
tc := cautils.NewTestCA(nil)
537537
defer tc.Stop()
538538
ts := newTestServer(t)
@@ -654,14 +654,8 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {
654654
return nil
655655
}))
656656

657-
var demoteNode, lastNode *raftutils.TestNode
658-
if leader {
659-
demoteNode = nodes[1]
660-
lastNode = nodes[2]
661-
} else {
662-
demoteNode = nodes[2]
663-
lastNode = nodes[1]
664-
}
657+
demoteNode := nodes[2]
658+
lastNode := nodes[1]
665659

666660
raftMember = ts.Server.raft.GetMemberByNodeID(demoteNode.SecurityConfig.ClientTLSCreds.NodeID())
667661
assert.NotNil(t, raftMember)
@@ -734,10 +728,5 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {
734728

735729
func TestUpdateNodeDemote(t *testing.T) {
736730
t.Parallel()
737-
testUpdateNodeDemote(false, t)
738-
}
739-
740-
func TestUpdateNodeDemoteLeader(t *testing.T) {
741-
t.Parallel()
742-
testUpdateNodeDemote(true, t)
731+
testUpdateNodeDemote(t)
743732
}

manager/role_manager.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,21 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
136136
rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second)
137137
defer rmCancel()
138138

139+
if member.RaftID == rm.raft.Config.ID {
140+
// Don't use rmCtx, because we expect to lose
141+
// leadership, which will cancel this context.
142+
log.L.Info("demoted; transferring leadership")
143+
err := rm.raft.TransferLeadership(context.Background())
144+
if err == nil {
145+
return
146+
}
147+
log.L.WithError(err).Info("failed to transfer leadership")
148+
}
139149
if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
140150
// TODO(aaronl): Retry later
141151
log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID)
142-
return
143152
}
153+
return
144154
}
145155

146156
err := rm.store.Update(func(tx store.Tx) error {

manager/state/raft/raft.go

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
412412
defer conn.Close()
413413
client := api.NewRaftMembershipClient(conn)
414414

415-
joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
415+
joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
416416
defer joinCancel()
417417
resp, err := client.Join(joinCtx, &api.JoinRequest{
418418
Addr: n.opts.Addr,
@@ -1030,6 +1030,10 @@ func (n *Node) UpdateNode(id uint64, addr string) {
10301030
// from a member who is willing to leave its raft
10311031
// membership to an active member of the raft
10321032
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
1033+
if req.Node == nil {
1034+
return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
1035+
}
1036+
10331037
nodeInfo, err := ca.RemoteNode(ctx)
10341038
if err != nil {
10351039
return nil, err
@@ -1100,18 +1104,58 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {
11001104

11011105
n.membershipLock.Lock()
11021106
defer n.membershipLock.Unlock()
1103-
if n.CanRemoveMember(id) {
1104-
cc := raftpb.ConfChange{
1105-
ID: id,
1106-
Type: raftpb.ConfChangeRemoveNode,
1107-
NodeID: id,
1108-
Context: []byte(""),
1109-
}
1110-
err := n.configure(ctx, cc)
1111-
return err
1107+
if !n.CanRemoveMember(id) {
1108+
return ErrCannotRemoveMember
11121109
}
11131110

1114-
return ErrCannotRemoveMember
1111+
cc := raftpb.ConfChange{
1112+
ID: id,
1113+
Type: raftpb.ConfChangeRemoveNode,
1114+
NodeID: id,
1115+
Context: []byte(""),
1116+
}
1117+
return n.configure(ctx, cc)
1118+
}
1119+
1120+
// TransferLeadership attempts to transfer leadership to a different node,
1121+
// and wait for the transfer to happen.
1122+
func (n *Node) TransferLeadership(ctx context.Context) error {
1123+
ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
1124+
defer cancelTransfer()
1125+
1126+
n.stopMu.RLock()
1127+
defer n.stopMu.RUnlock()
1128+
1129+
if !n.IsMember() {
1130+
return ErrNoRaftMember
1131+
}
1132+
1133+
if !n.isLeader() {
1134+
return ErrLostLeadership
1135+
}
1136+
1137+
transferee, err := n.transport.LongestActive()
1138+
if err != nil {
1139+
return errors.Wrap(err, "failed to get longest-active member")
1140+
}
1141+
start := time.Now()
1142+
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
1143+
ticker := time.NewTicker(n.opts.TickInterval / 10)
1144+
defer ticker.Stop()
1145+
var leader uint64
1146+
for {
1147+
leader = n.leader()
1148+
if leader != raft.None && leader != n.Config.ID {
1149+
break
1150+
}
1151+
select {
1152+
case <-ctx.Done():
1153+
return ctx.Err()
1154+
case <-ticker.C:
1155+
}
1156+
}
1157+
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
1158+
return nil
11151159
}
11161160

11171161
// RemoveMember submits a configuration change to remove a member from the raft cluster
@@ -1726,23 +1770,12 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
17261770
}
17271771

17281772
if cc.NodeID == n.Config.ID {
1729-
// wait the commit ack to be sent before closing connection
1773+
// wait for the commit ack to be sent before closing connection
17301774
n.asyncTasks.Wait()
17311775

17321776
n.NodeRemoved()
1733-
// if there are only 2 nodes in the cluster, and leader is leaving
1734-
// before closing the connection, leader has to ensure that follower gets
1735-
// noticed about this raft conf change commit. Otherwise, follower would
1736-
// assume there are still 2 nodes in the cluster and won't get elected
1737-
// into the leader by acquiring the majority (2 nodes)
1738-
1739-
// while n.asyncTasks.Wait() could be helpful in this case
1740-
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
1741-
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
1742-
} else {
1743-
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
1744-
return err
1745-
}
1777+
} else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
1778+
return err
17461779
}
17471780

17481781
return n.cluster.RemoveMember(cc.NodeID)
@@ -1852,3 +1885,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
18521885
}
18531886
return sids
18541887
}
1888+
1889+
func (n *Node) reqTimeout() time.Duration {
1890+
return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
1891+
}

manager/state/raft/transport/transport.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool {
295295
return active
296296
}
297297

298+
// LongestActive returns the ID of the peer that has been active for the longest
299+
// length of time.
300+
func (t *Transport) LongestActive() (uint64, error) {
301+
p, err := t.longestActive()
302+
if err != nil {
303+
return 0, err
304+
}
305+
306+
return p.id, nil
307+
}
308+
309+
// longestActive returns the peer that has been active for the longest length of
310+
// time.
298311
func (t *Transport) longestActive() (*peer, error) {
299312
var longest *peer
300313
var longestTime time.Time

0 commit comments

Comments
 (0)