Skip to content

Commit d936b08

Browse files
NRG: Raft peer additions (#7632)
This PR makes the following changes: 1. Add test helpers so that we can exercise peer addition in unit tests 2. Fix a case where a partition and peer addition could lead to disjoint majorities Signed-off-by: Daniele Sciascia <daniele@nats.io>
2 parents 94dd262 + be7e595 commit d936b08

File tree

3 files changed

+211
-54
lines changed

3 files changed

+211
-54
lines changed

server/raft.go

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -951,21 +951,6 @@ func (n *raft) ProposeAddPeer(peer string) error {
951951
return nil
952952
}
953953

954-
// Remove the peer with the given id from our membership,
955-
// and adjusts cluster size and quorum accordingly.
956-
// Lock should be held.
957-
func (n *raft) removePeer(peer string) {
958-
if n.removed == nil {
959-
n.removed = map[string]time.Time{}
960-
}
961-
n.removed[peer] = time.Now()
962-
if _, ok := n.peers[peer]; ok {
963-
delete(n.peers, peer)
964-
n.adjustClusterSizeAndQuorum()
965-
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
966-
}
967-
}
968-
969954
// ProposeRemovePeer is called to remove a peer from the group.
970955
func (n *raft) ProposeRemovePeer(peer string) error {
971956
n.Lock()
@@ -2659,6 +2644,45 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account,
26592644
prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply))
26602645
}
26612646

2647+
// Adds peer with the given id to our membership,
2648+
// and adjusts cluster size and quorum accordingly.
2649+
// Lock should be held.
2650+
func (n *raft) addPeer(peer string) {
2651+
// If we were on the removed list reverse that here.
2652+
if n.removed != nil {
2653+
delete(n.removed, peer)
2654+
}
2655+
2656+
if lp, ok := n.peers[peer]; !ok {
2657+
// We are not tracking this one automatically so we need
2658+
// to bump cluster size.
2659+
n.peers[peer] = &lps{time.Time{}, 0, true}
2660+
} else {
2661+
// Mark as added.
2662+
lp.kp = true
2663+
}
2664+
2665+
// Adjust cluster size and quorum if needed.
2666+
n.adjustClusterSizeAndQuorum()
2667+
// Write out our new state.
2668+
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
2669+
}
2670+
2671+
// Remove the peer with the given id from our membership,
2672+
// and adjusts cluster size and quorum accordingly.
2673+
// Lock should be held.
2674+
func (n *raft) removePeer(peer string) {
2675+
if n.removed == nil {
2676+
n.removed = map[string]time.Time{}
2677+
}
2678+
n.removed[peer] = time.Now()
2679+
if _, ok := n.peers[peer]; ok {
2680+
delete(n.peers, peer)
2681+
n.adjustClusterSizeAndQuorum()
2682+
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
2683+
}
2684+
}
2685+
26622686
// Build and send appendEntry request for the given entry that changes
26632687
// membership (EntryAddPeer / EntryRemovePeer).
26642688
// Returns true if the entry made it to the WAL and was sent to the followers
@@ -2677,6 +2701,10 @@ func (n *raft) sendMembershipChange(e *Entry) bool {
26772701
return false
26782702
}
26792703

2704+
if e.Type == EntryAddPeer {
2705+
n.addPeer(string(e.Data))
2706+
}
2707+
26802708
if e.Type == EntryRemovePeer {
26812709
n.removePeer(string(e.Data))
26822710
if n.csz == 1 {
@@ -3202,22 +3230,8 @@ func (n *raft) applyCommit(index uint64) error {
32023230
// Store our peer in our global peer map for all peers.
32033231
peers.LoadOrStore(newPeer, newPeer)
32043232

3205-
// If we were on the removed list reverse that here.
3206-
if n.removed != nil {
3207-
delete(n.removed, newPeer)
3208-
}
3233+
n.addPeer(newPeer)
32093234

3210-
if lp, ok := n.peers[newPeer]; !ok {
3211-
// We are not tracking this one automatically so we need to bump cluster size.
3212-
n.peers[newPeer] = &lps{time.Time{}, 0, true}
3213-
} else {
3214-
// Mark as added.
3215-
lp.kp = true
3216-
}
3217-
// Adjust cluster size and quorum if needed.
3218-
n.adjustClusterSizeAndQuorum()
3219-
// Write out our new state.
3220-
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
32213235
// We pass these up as well.
32223236
committed = append(committed, e)
32233237

@@ -3363,8 +3377,6 @@ func (n *raft) trackPeer(peer string) error {
33633377
}
33643378
if ps := n.peers[peer]; ps != nil {
33653379
ps.ts = time.Now()
3366-
} else if !isRemoved {
3367-
n.peers[peer] = &lps{time.Now(), 0, false}
33683380
}
33693381
n.Unlock()
33703382

server/raft_helpers_test.go

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,31 @@ func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory,
143143
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
144144
}
145145

146-
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
146+
func (c *cluster) createWAL(name string, st StorageType) WAL {
147147
c.t.Helper()
148+
var err error
149+
var store WAL
150+
if st == FileStorage {
151+
store, err = newFileStore(
152+
FileStoreConfig{
153+
StoreDir: c.t.TempDir(),
154+
BlockSize: defaultMediumBlockSize,
155+
AsyncFlush: false,
156+
SyncInterval: 5 * time.Minute},
157+
StreamConfig{
158+
Name: name,
159+
Storage: FileStorage})
160+
} else {
161+
store, err = newMemStore(
162+
&StreamConfig{
163+
Name: name,
164+
Storage: MemoryStorage})
165+
}
166+
require_NoError(c.t, err)
167+
return store
168+
}
148169

149-
var sg smGroup
170+
func serverPeerNames(servers []*Server) []string {
150171
var peers []string
151172

152173
for _, s := range servers {
@@ -156,30 +177,56 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
156177
s.mu.RUnlock()
157178
}
158179

180+
return peers
181+
}
182+
183+
func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string, smf smFactory) stateMachine {
184+
s.bootstrapRaftNode(cfg, peers, true)
185+
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
186+
require_NoError(c.t, err)
187+
sm := smf(s, cfg, n)
188+
go smLoop(sm)
189+
return sm
190+
}
191+
192+
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
193+
c.t.Helper()
194+
195+
var sg smGroup
196+
peers := serverPeerNames(servers)
197+
159198
for _, s := range servers {
160-
var cfg *RaftConfig
161-
if st == FileStorage {
162-
fs, err := newFileStore(
163-
FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
164-
StreamConfig{Name: name, Storage: FileStorage},
165-
)
166-
require_NoError(c.t, err)
167-
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
168-
} else {
169-
ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage})
170-
require_NoError(c.t, err)
171-
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: ms}
172-
}
173-
s.bootstrapRaftNode(cfg, peers, true)
174-
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
175-
require_NoError(c.t, err)
176-
sm := smf(s, cfg, n)
177-
sg = append(sg, sm)
178-
go smLoop(sm)
199+
cfg := &RaftConfig{
200+
Name: name,
201+
Store: c.t.TempDir(),
202+
Log: c.createWAL(name, st)}
203+
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
179204
}
180205
return sg
181206
}
182207

208+
func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
209+
c.t.Helper()
210+
211+
server := c.addInNewServer()
212+
213+
cfg := &RaftConfig{
214+
Name: name,
215+
Store: c.t.TempDir(),
216+
Log: c.createWAL(name, st)}
217+
218+
peers := serverPeerNames(c.servers)
219+
return c.createStateMachine(server, cfg, peers, smf)
220+
}
221+
222+
func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
223+
return c.addNodeEx(name, smf, FileStorage)
224+
}
225+
226+
func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
227+
return c.addNodeEx(name, smf, MemoryStorage)
228+
}
229+
183230
// Driver program for the state machine.
184231
// Should be run in its own go routine.
185232
func smLoop(sm stateMachine) {

server/raft_test.go

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2254,6 +2254,11 @@ func TestNRGQuorumAccounting(t *testing.T) {
22542254
nats1 := "yrzKKRBu" // "nats-1"
22552255
nats2 := "cnrtt3eg" // "nats-2"
22562256

2257+
n.Lock()
2258+
n.addPeer(nats1)
2259+
n.addPeer(nats2)
2260+
n.Unlock()
2261+
22572262
// Timeline
22582263
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
22592264
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
@@ -2283,6 +2288,11 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
22832288
nats1 := "yrzKKRBu" // "nats-1"
22842289
nats2 := "cnrtt3eg" // "nats-2"
22852290

2291+
n.Lock()
2292+
n.addPeer(nats1)
2293+
n.addPeer(nats2)
2294+
n.Unlock()
2295+
22862296
// Timeline
22872297
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
22882298
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
@@ -3648,6 +3658,8 @@ func TestNRGLostQuorum(t *testing.T) {
36483658
n, cleanup := initSingleMemRaftNode(t)
36493659
defer cleanup()
36503660

3661+
nats0 := "S1Nunr6R" // "nats-0"
3662+
36513663
require_Equal(t, n.State(), Follower)
36523664
require_False(t, n.Quorum())
36533665
require_True(t, n.lostQuorum())
@@ -3659,7 +3671,7 @@ func TestNRGLostQuorum(t *testing.T) {
36593671
// Respond to a vote request.
36603672
sub, err := nc.Subscribe(n.vsubj, func(m *nats.Msg) {
36613673
req := decodeVoteRequest(m.Data, m.Reply)
3662-
resp := voteResponse{term: req.term, peer: "random", granted: true}
3674+
resp := voteResponse{term: req.term, peer: nats0, granted: true}
36633675
m.Respond(resp.encode())
36643676
})
36653677
require_NoError(t, err)
@@ -3672,6 +3684,10 @@ func TestNRGLostQuorum(t *testing.T) {
36723684
require_False(t, n.Quorum())
36733685
require_True(t, n.lostQuorum())
36743686

3687+
n.Lock()
3688+
n.addPeer(nats0)
3689+
n.Unlock()
3690+
36753691
n.runAsCandidate()
36763692
require_Equal(t, n.State(), Leader)
36773693
require_True(t, n.Quorum())
@@ -3816,6 +3832,11 @@ func TestNRGQuorumAfterLeaderStepdown(t *testing.T) {
38163832
nats0 := "S1Nunr6R" // "nats-0"
38173833
nats1 := "yrzKKRBu" // "nats-1"
38183834

3835+
n.Lock()
3836+
n.addPeer(nats0)
3837+
n.addPeer(nats1)
3838+
n.Unlock()
3839+
38193840
// Become leader.
38203841
n.switchToCandidate()
38213842
n.switchToLeader()
@@ -4411,3 +4432,80 @@ func TestNRGLeaderResurrectsRemovedPeers(t *testing.T) {
44114432
followers[1].restart()
44124433
require_Equal(t, len(leader.node().Peers()), 2)
44134434
}
4435+
4436+
func TestNRGAddPeers(t *testing.T) {
4437+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4438+
defer c.shutdown()
4439+
4440+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
4441+
leader := rg.waitOnLeader()
4442+
4443+
require_Equal(t, leader.node().ClusterSize(), 3)
4444+
4445+
for range 6 {
4446+
rg = append(rg, c.addMemRaftNode("TEST", newStateAdder))
4447+
}
4448+
4449+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4450+
if leader.node().ClusterSize() != 9 {
4451+
return errors.New("node additions still in progress")
4452+
}
4453+
return nil
4454+
})
4455+
4456+
require_Equal(t, leader.node().ClusterSize(), 9)
4457+
}
4458+
4459+
func TestNRGDisjointMajorities(t *testing.T) {
4460+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4461+
defer c.shutdown()
4462+
4463+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
4464+
rg.waitOnLeader()
4465+
4466+
leader := rg.leader()
4467+
followers := rg.followers()
4468+
require_Equal(t, len(followers), 2)
4469+
4470+
// Lock all followers, a majority.
4471+
locked := rg.lockFollowers()
4472+
require_Equal(t, len(locked), 2)
4473+
4474+
defer func() {
4475+
for _, l := range locked {
4476+
l.node().(*raft).Unlock()
4477+
}
4478+
}()
4479+
4480+
// Add one node (cluster size is 4)
4481+
c.addMemRaftNode("TEST", newStateAdder)
4482+
4483+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4484+
if leader.node().ClusterSize() != 4 {
4485+
return errors.New("node addition still in progress")
4486+
}
4487+
return nil
4488+
})
4489+
4490+
// Attempt another node addition
4491+
c.addMemRaftNode("TEST", newStateAdder)
4492+
4493+
// If bug is present:
4494+
// The leader is able to form a majority because it would
4495+
// add peers immediately, and readjust cluster size and
4496+
// quorum only after committing EntryAddPeer. This allowed
4497+
// the leader to commit the entries using only newly added
4498+
// nodes (the original followers are locked and not
4499+
// acknowledging the EntryAddPeer proposals!)
4500+
// This should not never happen. In a real scenario the
4501+
// followers could be partitioned, and they actually have
4502+
// a majority... so they could diverge and we end up with
4503+
// two different histories.
4504+
//
4505+
// Here wait a little bit, and check that the leader is
4506+
// unable to make any progess.
4507+
time.Sleep(time.Second)
4508+
4509+
require_Equal(t, leader.node().ClusterSize(), 4)
4510+
require_Equal(t, leader.node().MembershipChangeInProgress(), true)
4511+
}

0 commit comments

Comments
 (0)