Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ var (
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
errEntryStoreFailed = errors.New("raft: could not store entry to WAL")
errNodeClosed = errors.New("raft: node is closed")
errNodeRemoved = errors.New("raft: peer was removed")
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
errNoSnapAvailable = errors.New("raft: no snapshot available")
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
Expand Down Expand Up @@ -3147,8 +3148,19 @@ func (n *raft) applyCommit(index uint64) error {

n.commit = index
ae.buf = nil

var committed []*Entry

defer func() {
// Pass to the upper layers if we have normal entries. It is
// entirely possible that 'committed' might be an empty slice here,
// which will happen if we've processed updates inline (like peer
// states). In which case the upper layer will just call down with
// Applied() with no further action.
n.apply.push(newCommittedEntry(index, committed))
// Place back in the pool.
ae.returnToPool()
}()

for _, e := range ae.entries {
switch e.Type {
case EntryNormal:
Expand Down Expand Up @@ -3199,6 +3211,9 @@ func (n *raft) applyCommit(index uint64) error {
// We pass these up as well.
committed = append(committed, e)

// We are done with this membership change
n.membChanging = false

case EntryRemovePeer:
peer := string(e.Data)
n.debug("Removing peer %q", peer)
Expand All @@ -3217,29 +3232,22 @@ func (n *raft) applyCommit(index uint64) error {
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
}

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdownLocked(n.selectNextLeader())
}

// Remove from string intern map.
peers.Delete(peer)

// We pass these up as well.
committed = append(committed, e)
}
if e.ChangesMembership() {

// We are done with this membership change
n.membChanging = false

// If this is us and we are the leader signal the caller
// to attempt to stepdown.
if peer == n.id && n.State() == Leader {
return errNodeRemoved
}
}
}
// Pass to the upper layers if we have normal entries. It is
// entirely possible that 'committed' might be an empty slice here,
// which will happen if we've processed updates inline (like peer
// states). In which case the upper layer will just call down with
// Applied() with no further action.
n.apply.push(newCommittedEntry(index, committed))
// Place back in the pool.
ae.returnToPool()
return nil
}

Expand Down Expand Up @@ -3291,19 +3299,36 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
}
results[ar.peer] = struct{}{}

// We don't count ourselves to account for leader changes, so add 1.
if nr := len(results); nr+1 >= n.qn {
acks := len(results)
if n.peers[n.ID()] != nil {
// Count the leader if it's still part of membership
acks += 1
}

var applyErr error
if acks >= n.qn {
// We have a quorum.
for index := n.commit + 1; index <= ar.index; index++ {
if err := n.applyCommit(index); err != nil && err != errNodeClosed {
n.error("Got an error applying commit for %d: %v", index, err)
if applyErr = n.applyCommit(index); applyErr != nil {
switch applyErr {
case errNodeClosed, errNodeRemoved:
default:
n.error("Got an error applying commit for %d: %v", index, applyErr)
}
break
}
}
sendHB = n.prop.len() == 0
}
n.Unlock()

if applyErr == errNodeRemoved {
// Leader was peer-removed. Attempt a step-down to
// a new leader before shutting down.
n.StepDown()
n.Stop()
}

if sendHB {
n.sendHeartbeat()
}
Expand Down
55 changes: 55 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4289,3 +4289,58 @@ func TestNRGProposeRemovePeerQuorum(t *testing.T) {
}
})
}

// Test outline:
// - In a R3 cluster, PeerRemove the leader and block on of
// the followers.
// - Verify that the membership change can't make progress,
// the leader should not count its own ack towards quorum.
// - Release the previously blocked follower and expect the
// membership change to take place.
func TestNRGProposeRemovePeerLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

leader := rg.leader().node()
followers := rg.followers()
leaderID := leader.ID()
require_True(t, len(followers) == 2)

// Block follower 0 and remove the leader
followers[0].node().(*raft).Lock()
err := leader.ProposeRemovePeer(leader.ID())
require_NoError(t, err)

// Should not be able to make progress
time.Sleep(time.Second)
require_True(t, leader.MembershipChangeInProgress())

// Unlock the follower and expect the membership
// change to eventually finish
followers[0].node().(*raft).Unlock()
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
if leader.MembershipChangeInProgress() {
return errors.New("membership still in progress")
} else {
return nil
}
})

// Old leader steps down
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
newLeader := rg.waitOnLeader()
if newLeader.node().ID() == leaderID {
return errors.New("leader has not changed yet")
}
return nil
})

newLeader := rg.waitOnLeader()
require_Equal(t, leader.State(), Closed)
require_NotEqual(t, leader.ID(), newLeader.node().ID())
require_Equal(t, len(newLeader.node().Peers()), 2)
require_False(t, newLeader.node().MembershipChangeInProgress())
}