Skip to content

Commit 2939985

Browse files
NRG: do not count messages from removed peers towards quorum (#7589)
Ignore `appendEntryResponse` from removed peers. Signed-off-by: Daniele Sciascia <daniele@nats.io>
2 parents f89d133 + efabcbf commit 2939985

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

server/raft.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3257,8 +3257,10 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
32573257
return
32583258
}
32593259

3260+
ps := n.peers[ar.peer]
3261+
32603262
// Update peer's last index.
3261-
if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li {
3263+
if ps != nil && ar.index > ps.li {
32623264
ps.li = ar.index
32633265
}
32643266

@@ -3273,6 +3275,12 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
32733275
return
32743276
}
32753277

3278+
// Not a peer, can't count this message towards quorum
3279+
if ps == nil {
3280+
n.Unlock()
3281+
return
3282+
}
3283+
32763284
// See if we have items to apply.
32773285
var sendHB bool
32783286

server/raft_helpers_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ func (sg smGroup) leader() stateMachine {
5757
return nil
5858
}
5959

60+
func (sg smGroup) followers() []stateMachine {
61+
var f []stateMachine
62+
for _, sm := range sg {
63+
if sm.node().Leader() {
64+
continue
65+
}
66+
f = append(f, sm)
67+
}
68+
return f
69+
}
70+
6071
// Wait on a leader to be elected.
6172
func (sg smGroup) waitOnLeader() stateMachine {
6273
expires := time.Now().Add(10 * time.Second)

server/raft_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4256,3 +4256,36 @@ func TestNRGUncommittedMembershipChangeOnNewLeader(t *testing.T) {
42564256
err := n.ProposeRemovePeer(nats1)
42574257
require_Error(t, err, errMembershipChange)
42584258
}
4259+
4260+
func TestNRGProposeRemovePeerQuorum(t *testing.T) {
4261+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4262+
defer c.shutdown()
4263+
4264+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
4265+
rg.waitOnLeader()
4266+
4267+
leader := rg.leader().node()
4268+
followers := rg.followers()
4269+
4270+
require_True(t, len(followers) == 2)
4271+
4272+
// Block follower 0 and remove follower 1
4273+
followers[0].node().(*raft).Lock()
4274+
err := leader.ProposeRemovePeer(followers[1].node().ID())
4275+
require_NoError(t, err)
4276+
4277+
// Should not be able to make progress
4278+
time.Sleep(time.Second)
4279+
require_True(t, leader.MembershipChangeInProgress())
4280+
4281+
// Unlock the other follower and expect the membership
4282+
// change to eventually finish
4283+
followers[0].node().(*raft).Unlock()
4284+
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
4285+
if leader.MembershipChangeInProgress() {
4286+
return errors.New("membership still in progress")
4287+
} else {
4288+
return nil
4289+
}
4290+
})
4291+
}

0 commit comments

Comments
 (0)