Skip to content

Commit 2102261

Browse files
NRG: Track uncommitted membership entry index in membChanging
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 22899a5 commit 2102261

File tree

2 files changed

+56
-19
lines changed

2 files changed

+56
-19
lines changed

server/raft.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,14 @@ type raft struct {
171171
llqrt time.Time // Last quorum lost time
172172
lsut time.Time // Last scale-up time
173173

174-
term uint64 // The current vote term
175-
pterm uint64 // Previous term from the last snapshot
176-
pindex uint64 // Previous index from the last snapshot
177-
commit uint64 // Index of the most recent commit
178-
processed uint64 // Index of the most recently processed commit
179-
applied uint64 // Index of the most recently applied commit
180-
papplied uint64 // First sequence of our log, matches when we last installed a snapshot.
174+
term uint64 // The current vote term
175+
pterm uint64 // Previous term from the last snapshot
176+
pindex uint64 // Previous index from the last snapshot
177+
commit uint64 // Index of the most recent commit
178+
processed uint64 // Index of the most recently processed commit
179+
applied uint64 // Index of the most recently applied commit
180+
papplied uint64 // First sequence of our log, matches when we last installed a snapshot.
181+
membChanging uint64 // Index of uncommitted membership change entry (0 means no change in progress)
181182

182183
aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled.
183184

@@ -231,7 +232,6 @@ type raft struct {
231232
observer bool // The node is observing, i.e. not able to become leader
232233
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
233234
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
234-
membChanging bool // There is a membership change proposal in progress
235235
deleted bool // If the node was deleted.
236236
}
237237

@@ -954,7 +954,7 @@ func (n *raft) ProposeAddPeer(peer string) error {
954954
n.RUnlock()
955955
return werr
956956
}
957-
if n.membChanging {
957+
if n.membChanging > 0 {
958958
n.RUnlock()
959959
return errMembershipChange
960960
}
@@ -984,7 +984,7 @@ func (n *raft) ProposeRemovePeer(peer string) error {
984984
return nil
985985
}
986986

987-
if n.membChanging {
987+
if n.membChanging > 0 {
988988
n.RUnlock()
989989
return errMembershipChange
990990
}
@@ -1004,7 +1004,7 @@ func (n *raft) ProposeRemovePeer(peer string) error {
10041004
func (n *raft) MembershipChangeInProgress() bool {
10051005
n.RLock()
10061006
defer n.RUnlock()
1007-
return n.membChanging
1007+
return n.membChanging > 0
10081008
}
10091009

10101010
// ClusterSize reports back the total cluster size.
@@ -2632,7 +2632,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _
26322632
n.RUnlock()
26332633
return
26342634
}
2635-
if n.membChanging {
2635+
if n.membChanging > 0 {
26362636
n.debug("Ignoring forwarded peer removal proposal, membership changing")
26372637
n.RUnlock()
26382638
return
@@ -2716,16 +2716,16 @@ func (n *raft) sendMembershipChange(e *Entry) bool {
27162716

27172717
// Only makes sense to call this with entries that change membership.
27182718
// Also, ignore if we're already changing membership.
2719-
if !e.ChangesMembership() || n.membChanging {
2719+
if !e.ChangesMembership() || n.membChanging > 0 {
27202720
return false
27212721
}
27222722

2723-
n.membChanging = true
27242723
err := n.sendAppendEntryLocked([]*Entry{e}, true)
27252724
if err != nil {
2726-
n.membChanging = false
27272725
return false
27282726
}
2727+
// Set to the index where we stored the membership change
2728+
n.membChanging = n.pindex
27292729

27302730
if e.Type == EntryAddPeer {
27312731
n.addPeer(string(e.Data))
@@ -3227,7 +3227,7 @@ func (n *raft) applyCommit(index uint64) error {
32273227
committed = append(committed, e)
32283228

32293229
// We are done with this membership change
3230-
n.membChanging = false
3230+
n.membChanging = 0
32313231

32323232
case EntryRemovePeer:
32333233
peer := string(e.Data)
@@ -3242,7 +3242,7 @@ func (n *raft) applyCommit(index uint64) error {
32423242
committed = append(committed, e)
32433243

32443244
// We are done with this membership change
3245-
n.membChanging = false
3245+
n.membChanging = 0
32463246

32473247
// If this is us and we are the leader signal the caller
32483248
// to attempt to stepdown.
@@ -3589,6 +3589,11 @@ func (n *raft) truncateWAL(term, index uint64) {
35893589
}
35903590
// Set after we know we have truncated properly.
35913591
n.pterm, n.pindex = term, index
3592+
3593+
// Check if we're truncating an uncommitted membership change.
3594+
if n.membChanging > 0 && n.membChanging > index {
3595+
n.membChanging = 0
3596+
}
35923597
}
35933598

35943599
// Reset our WAL. This is equivalent to truncating all data from the log.
@@ -3958,7 +3963,8 @@ CONTINUE:
39583963
}
39593964
case EntryAddPeer:
39603965
// When receiving or restoring, mark membership as changing.
3961-
n.membChanging = true
3966+
// Set to the index where this entry was stored (pindex is now this entry's index)
3967+
n.membChanging = n.pindex
39623968
if newPeer := string(e.Data); len(newPeer) == idLen {
39633969
// Track directly, but wait for commit to be official
39643970
if _, ok := n.peers[newPeer]; !ok {
@@ -3969,7 +3975,8 @@ CONTINUE:
39693975
}
39703976
case EntryRemovePeer:
39713977
// When receiving or restoring, mark membership as changing.
3972-
n.membChanging = true
3978+
// Set to the index where this entry was stored (pindex is now this entry's index)
3979+
n.membChanging = n.pindex
39733980
}
39743981
}
39753982

server/raft_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4438,6 +4438,36 @@ func TestNRGUncommittedMembershipChangeOnNewLeader(t *testing.T) {
44384438
require_Error(t, err, errMembershipChange)
44394439
}
44404440

4441+
func TestNRGUncommittedMembershipChangeGetsTruncated(t *testing.T) {
4442+
n, cleanup := initSingleMemRaftNode(t)
4443+
defer cleanup()
4444+
4445+
nats1 := "yrzKKRBu" // "nats-1"
4446+
4447+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
4448+
entries := []*Entry{newEntry(EntryNormal, esm)}
4449+
aeMsg := encode(t, &appendEntry{leader: nats1, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
4450+
entries = []*Entry{newEntry(EntryAddPeer, []byte(nats1))}
4451+
aeAddPeer := encode(t, &appendEntry{leader: nats1, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
4452+
4453+
// Set up a membership change.
4454+
n.processAppendEntry(aeMsg, n.aesub)
4455+
n.processAppendEntry(aeAddPeer, n.aesub)
4456+
require_Equal(t, n.pindex, 2)
4457+
require_True(t, n.MembershipChangeInProgress())
4458+
require_Equal(t, n.membChanging, 2)
4459+
4460+
// If the entry containing the membership change isn't truncated, it should remain in progress.
4461+
n.truncateWAL(n.pterm, n.pindex)
4462+
require_True(t, n.MembershipChangeInProgress())
4463+
require_Equal(t, n.membChanging, 2)
4464+
4465+
// If the entry IS truncated, then it shouldn't be in progress anymore.
4466+
n.truncateWAL(n.pterm, n.pindex-1)
4467+
require_False(t, n.MembershipChangeInProgress())
4468+
require_Equal(t, n.membChanging, 0)
4469+
}
4470+
44414471
func TestNRGUncommittedMembershipChangeOnNewLeaderForwardedRemovePeerProposal(t *testing.T) {
44424472
n, cleanup := initSingleMemRaftNode(t)
44434473
defer cleanup()

0 commit comments

Comments
 (0)