Skip to content

Commit d88e9e8

Browse files
committed
NRG: Fix leader peer removal
Revisit quorum counting in `trackPeer` so that the implicit ack from the leader is counted only if the node is still part of the membership. So the corresponsing EntryPeerRemove message must be replicated to a majority of the new membership, not including the leader to be removed. When the leader commits an EntryPeerRemove message that removes the leader itself, attempt a proper step-down to a new leader, before stopping. Signed-off-by: Daniele Sciascia <daniele@nats.io>
1 parent 34b2c93 commit d88e9e8

File tree

2 files changed

+100
-20
lines changed

2 files changed

+100
-20
lines changed

server/raft.go

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ var (
306306
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
307307
errEntryStoreFailed = errors.New("raft: could not store entry to WAL")
308308
errNodeClosed = errors.New("raft: node is closed")
309+
errNodeRemoved = errors.New("raft: peer was removed")
309310
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
310311
errNoSnapAvailable = errors.New("raft: no snapshot available")
311312
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
@@ -3147,8 +3148,19 @@ func (n *raft) applyCommit(index uint64) error {
31473148

31483149
n.commit = index
31493150
ae.buf = nil
3150-
31513151
var committed []*Entry
3152+
3153+
defer func() {
3154+
// Pass to the upper layers if we have normal entries. It is
3155+
// entirely possible that 'committed' might be an empty slice here,
3156+
// which will happen if we've processed updates inline (like peer
3157+
// states). In which case the upper layer will just call down with
3158+
// Applied() with no further action.
3159+
n.apply.push(newCommittedEntry(index, committed))
3160+
// Place back in the pool.
3161+
ae.returnToPool()
3162+
}()
3163+
31523164
for _, e := range ae.entries {
31533165
switch e.Type {
31543166
case EntryNormal:
@@ -3199,6 +3211,9 @@ func (n *raft) applyCommit(index uint64) error {
31993211
// We pass these up as well.
32003212
committed = append(committed, e)
32013213

3214+
// We are done with this membership change
3215+
n.membChanging = false
3216+
32023217
case EntryRemovePeer:
32033218
peer := string(e.Data)
32043219
n.debug("Removing peer %q", peer)
@@ -3217,29 +3232,22 @@ func (n *raft) applyCommit(index uint64) error {
32173232
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
32183233
}
32193234

3220-
// If this is us and we are the leader we should attempt to stepdown.
3221-
if peer == n.id && n.State() == Leader {
3222-
n.stepdownLocked(n.selectNextLeader())
3223-
}
3224-
32253235
// Remove from string intern map.
32263236
peers.Delete(peer)
32273237

32283238
// We pass these up as well.
32293239
committed = append(committed, e)
3230-
}
3231-
if e.ChangesMembership() {
3240+
3241+
// We are done with this membership change
32323242
n.membChanging = false
3243+
3244+
// If this is us and we are the leader signal the caller
3245+
// to attempt to stepdown.
3246+
if peer == n.id && n.State() == Leader {
3247+
return errNodeRemoved
3248+
}
32333249
}
32343250
}
3235-
// Pass to the upper layers if we have normal entries. It is
3236-
// entirely possible that 'committed' might be an empty slice here,
3237-
// which will happen if we've processed updates inline (like peer
3238-
// states). In which case the upper layer will just call down with
3239-
// Applied() with no further action.
3240-
n.apply.push(newCommittedEntry(index, committed))
3241-
// Place back in the pool.
3242-
ae.returnToPool()
32433251
return nil
32443252
}
32453253

@@ -3291,19 +3299,36 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
32913299
}
32923300
results[ar.peer] = struct{}{}
32933301

3294-
// We don't count ourselves to account for leader changes, so add 1.
3295-
if nr := len(results); nr+1 >= n.qn {
3302+
acks := len(results)
3303+
if n.peers[n.ID()] != nil {
3304+
// Count the leader if it's still part of membership
3305+
acks += 1
3306+
}
3307+
3308+
var applyErr error
3309+
if acks >= n.qn {
32963310
// We have a quorum.
32973311
for index := n.commit + 1; index <= ar.index; index++ {
3298-
if err := n.applyCommit(index); err != nil && err != errNodeClosed {
3299-
n.error("Got an error applying commit for %d: %v", index, err)
3312+
if applyErr = n.applyCommit(index); applyErr != nil {
3313+
switch applyErr {
3314+
case errNodeClosed, errNodeRemoved:
3315+
default:
3316+
n.error("Got an error applying commit for %d: %v", index, applyErr)
3317+
}
33003318
break
33013319
}
33023320
}
33033321
sendHB = n.prop.len() == 0
33043322
}
33053323
n.Unlock()
33063324

3325+
if applyErr == errNodeRemoved {
3326+
// Leader was peer-removed. Attempt a step-down to
3327+
// a new leader before shutting down.
3328+
n.StepDown()
3329+
n.Stop()
3330+
}
3331+
33073332
if sendHB {
33083333
n.sendHeartbeat()
33093334
}

server/raft_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4289,3 +4289,58 @@ func TestNRGProposeRemovePeerQuorum(t *testing.T) {
42894289
}
42904290
})
42914291
}
4292+
4293+
// Test outline:
4294+
// - In a R3 cluster, PeerRemove the leader and block on of
4295+
// the followers.
4296+
// - Verify that the membership change can't make progress,
4297+
// the leader should not count its own ack towards quorum.
4298+
// - Release the previously blocked follower and expect the
4299+
// membership change to take place.
4300+
func TestNRGProposeRemovePeerLeader(t *testing.T) {
4301+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4302+
defer c.shutdown()
4303+
4304+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
4305+
rg.waitOnLeader()
4306+
4307+
leader := rg.leader().node()
4308+
followers := rg.followers()
4309+
leaderID := leader.ID()
4310+
require_True(t, len(followers) == 2)
4311+
4312+
// Block follower 0 and remove the leader
4313+
followers[0].node().(*raft).Lock()
4314+
err := leader.ProposeRemovePeer(leader.ID())
4315+
require_NoError(t, err)
4316+
4317+
// Should not be able to make progress
4318+
time.Sleep(time.Second)
4319+
require_True(t, leader.MembershipChangeInProgress())
4320+
4321+
// Unlock the follower and expect the membership
4322+
// change to eventually finish
4323+
followers[0].node().(*raft).Unlock()
4324+
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
4325+
if leader.MembershipChangeInProgress() {
4326+
return errors.New("membership still in progress")
4327+
} else {
4328+
return nil
4329+
}
4330+
})
4331+
4332+
// Old leader steps down
4333+
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
4334+
newLeader := rg.waitOnLeader()
4335+
if newLeader.node().ID() == leaderID {
4336+
return errors.New("leader has not changed yet")
4337+
}
4338+
return nil
4339+
})
4340+
4341+
newLeader := rg.waitOnLeader()
4342+
require_Equal(t, leader.State(), Closed)
4343+
require_NotEqual(t, leader.ID(), newLeader.node().ID())
4344+
require_Equal(t, len(newLeader.node().Peers()), 2)
4345+
require_False(t, newLeader.node().MembershipChangeInProgress())
4346+
}

0 commit comments

Comments
 (0)