Skip to content

Commit 22899a5

Browse files
NRG: Mark membChanging when sending
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent eac92e5 commit 22899a5

File tree

2 files changed

+37
-34
lines changed

2 files changed

+37
-34
lines changed

server/raft.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -943,61 +943,59 @@ func (n *raft) ForwardProposal(entry []byte) error {
943943

944944
// ProposeAddPeer is called to add a peer to the group.
945945
func (n *raft) ProposeAddPeer(peer string) error {
946-
n.Lock()
946+
n.RLock()
947947
// Check state under lock, we might not be leader anymore.
948948
if n.State() != Leader {
949-
n.Unlock()
949+
n.RUnlock()
950950
return errNotLeader
951951
}
952952
// Error if we had a previous write error.
953953
if werr := n.werr; werr != nil {
954-
n.Unlock()
954+
n.RUnlock()
955955
return werr
956956
}
957957
if n.membChanging {
958-
n.Unlock()
958+
n.RUnlock()
959959
return errMembershipChange
960960
}
961961
prop := n.prop
962-
n.membChanging = true
963-
n.Unlock()
962+
n.RUnlock()
964963

965964
prop.push(newProposedEntry(newEntry(EntryAddPeer, []byte(peer)), _EMPTY_))
966965
return nil
967966
}
968967

969968
// ProposeRemovePeer is called to remove a peer from the group.
970969
func (n *raft) ProposeRemovePeer(peer string) error {
971-
n.Lock()
970+
n.RLock()
972971

973972
// Error if we had a previous write error.
974973
if werr := n.werr; werr != nil {
975-
n.Unlock()
974+
n.RUnlock()
976975
return werr
977976
}
978977

979978
if n.State() != Leader {
980979
subj := n.rpsubj
981-
n.Unlock()
980+
n.RUnlock()
982981

983982
// Forward the proposal to the leader
984983
n.sendRPC(subj, _EMPTY_, []byte(peer))
985984
return nil
986985
}
987986

988987
if n.membChanging {
989-
n.Unlock()
988+
n.RUnlock()
990989
return errMembershipChange
991990
}
992991

993992
if len(n.peers) <= 1 {
994-
n.Unlock()
993+
n.RUnlock()
995994
return errRemoveLastNode
996995
}
997996

998997
prop := n.prop
999-
n.membChanging = true
1000-
n.Unlock()
998+
n.RUnlock()
1001999

10021000
prop.push(newProposedEntry(newEntry(EntryRemovePeer, []byte(peer)), _EMPTY_))
10031001
return nil
@@ -2622,26 +2620,25 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _
26222620
return
26232621
}
26242622

2625-
n.Lock()
2623+
n.RLock()
26262624
// Check state under lock, we might not be leader anymore.
26272625
if n.State() != Leader || !n.leaderState.Load() {
26282626
n.debug("Ignoring forwarded peer removal proposal, not leader")
2629-
n.Unlock()
2627+
n.RUnlock()
26302628
return
26312629
}
26322630
// Error if we had a previous write error.
26332631
if werr := n.werr; werr != nil {
2634-
n.Unlock()
2632+
n.RUnlock()
26352633
return
26362634
}
26372635
if n.membChanging {
26382636
n.debug("Ignoring forwarded peer removal proposal, membership changing")
2639-
n.Unlock()
2637+
n.RUnlock()
26402638
return
26412639
}
26422640
prop := n.prop
2643-
n.membChanging = true
2644-
n.Unlock()
2641+
n.RUnlock()
26452642

26462643
// Need to copy since this is underlying client/route buffer.
26472644
peer := copyBytes(msg)
@@ -2717,11 +2714,13 @@ func (n *raft) sendMembershipChange(e *Entry) bool {
27172714
n.Lock()
27182715
defer n.Unlock()
27192716

2720-
// Only makes sense to call this with entries that change membership
2721-
if !e.ChangesMembership() {
2717+
// Only makes sense to call this with entries that change membership.
2718+
// Also, ignore if we're already changing membership.
2719+
if !e.ChangesMembership() || n.membChanging {
27222720
return false
27232721
}
27242722

2723+
n.membChanging = true
27252724
err := n.sendAppendEntryLocked([]*Entry{e}, true)
27262725
if err != nil {
27272726
n.membChanging = false

server/raft_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4380,7 +4380,13 @@ func TestNRGProposeRemovePeerConcurrent(t *testing.T) {
43804380
require_NoError(t, err)
43814381

43824382
// Check that membership change is in progress.
4383-
require_True(t, n.MembershipChangeInProgress())
4383+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
4384+
if n.MembershipChangeInProgress() {
4385+
return nil
4386+
} else {
4387+
return errors.New("membership not in progress")
4388+
}
4389+
})
43844390

43854391
// Attempt to remove the second follower, should fail.
43864392
err = n.ProposeRemovePeer(locked[1].node().ID())
@@ -4627,18 +4633,19 @@ func TestNRGProposeRemovePeerAll(t *testing.T) {
46274633
followers := rg.followers()
46284634
require_Equal(t, len(followers), 2)
46294635

4630-
for _, follower := range followers {
4636+
peers := leader.node().Peers()
4637+
require_Equal(t, len(peers), 3)
4638+
for i, follower := range followers {
46314639
require_NoError(t, leader.node().ProposeRemovePeer(follower.node().ID()))
4632-
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4633-
if leader.node().MembershipChangeInProgress() {
4634-
return errors.New("membership still in progress")
4635-
} else {
4640+
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
4641+
if peers = leader.node().Peers(); len(peers) == 2-i {
46364642
return nil
46374643
}
4644+
return errors.New("membership still in progress")
46384645
})
46394646
}
46404647

4641-
peers := leader.node().Peers()
4648+
peers = leader.node().Peers()
46424649
leaderID := leader.node().ID()
46434650

46444651
// The leader is the only one left...
@@ -4661,16 +4668,13 @@ func TestNRGLeaderResurrectsRemovedPeers(t *testing.T) {
46614668

46624669
// Remove one follower
46634670
require_NoError(t, leader.node().ProposeRemovePeer(followers[0].node().ID()))
4664-
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4665-
if leader.node().MembershipChangeInProgress() {
4666-
return errors.New("membership still in progress")
4667-
} else {
4671+
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
4672+
if peers := leader.node().Peers(); len(peers) == 2 {
46684673
return nil
46694674
}
4675+
return errors.New("membership still in progress")
46704676
})
46714677

4672-
require_Equal(t, len(leader.node().Peers()), 2)
4673-
46744678
// Stop the leader and restart it.
46754679
// If bug is present: the leader resurrects the previously removed peer.
46764680
leader.stop()

0 commit comments

Comments
 (0)