Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,21 +951,6 @@ func (n *raft) ProposeAddPeer(peer string) error {
return nil
}

// Remove the peer with the given id from our membership,
// and adjusts cluster size and quorum accordingly.
// Lock should be held.
func (n *raft) removePeer(peer string) {
if n.removed == nil {
n.removed = map[string]time.Time{}
}
n.removed[peer] = time.Now()
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
n.adjustClusterSizeAndQuorum()
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
}
}

// ProposeRemovePeer is called to remove a peer from the group.
func (n *raft) ProposeRemovePeer(peer string) error {
n.Lock()
Expand Down Expand Up @@ -2659,6 +2644,45 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account,
prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply))
}

// Adds peer with the given id to our membership,
// and adjusts cluster size and quorum accordingly.
// Lock should be held.
func (n *raft) addPeer(peer string) {
// If we were on the removed list reverse that here.
if n.removed != nil {
delete(n.removed, peer)
}

if lp, ok := n.peers[peer]; !ok {
// We are not tracking this one automatically so we need
// to bump cluster size.
n.peers[peer] = &lps{time.Time{}, 0, true}
} else {
// Mark as added.
lp.kp = true
}

// Adjust cluster size and quorum if needed.
n.adjustClusterSizeAndQuorum()
// Write out our new state.
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
}

// Remove the peer with the given id from our membership,
// and adjusts cluster size and quorum accordingly.
// Lock should be held.
func (n *raft) removePeer(peer string) {
if n.removed == nil {
n.removed = map[string]time.Time{}
}
n.removed[peer] = time.Now()
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
n.adjustClusterSizeAndQuorum()
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
}
}

// Build and send appendEntry request for the given entry that changes
// membership (EntryAddPeer / EntryRemovePeer).
// Returns true if the entry made it to the WAL and was sent to the followers
Expand All @@ -2677,6 +2701,10 @@ func (n *raft) sendMembershipChange(e *Entry) bool {
return false
}

if e.Type == EntryAddPeer {
n.addPeer(string(e.Data))
}

if e.Type == EntryRemovePeer {
n.removePeer(string(e.Data))
if n.csz == 1 {
Expand Down Expand Up @@ -3202,22 +3230,8 @@ func (n *raft) applyCommit(index uint64) error {
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)

// If we were on the removed list reverse that here.
if n.removed != nil {
delete(n.removed, newPeer)
}
n.addPeer(newPeer)

if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Time{}, 0, true}
} else {
// Mark as added.
lp.kp = true
}
// Adjust cluster size and quorum if needed.
n.adjustClusterSizeAndQuorum()
// Write out our new state.
n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt})
// We pass these up as well.
committed = append(committed, e)

Expand Down Expand Up @@ -3363,8 +3377,6 @@ func (n *raft) trackPeer(peer string) error {
}
if ps := n.peers[peer]; ps != nil {
ps.ts = time.Now()
} else if !isRemoved {
n.peers[peer] = &lps{time.Now(), 0, false}
}
n.Unlock()

Expand Down
89 changes: 68 additions & 21 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,31 @@ func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory,
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
func (c *cluster) createWAL(name string, st StorageType) WAL {
c.t.Helper()
var err error
var store WAL
if st == FileStorage {
store, err = newFileStore(
FileStoreConfig{
StoreDir: c.t.TempDir(),
BlockSize: defaultMediumBlockSize,
AsyncFlush: false,
SyncInterval: 5 * time.Minute},
StreamConfig{
Name: name,
Storage: FileStorage})
} else {
store, err = newMemStore(
&StreamConfig{
Name: name,
Storage: MemoryStorage})
}
require_NoError(c.t, err)
return store
}

var sg smGroup
func serverPeerNames(servers []*Server) []string {
var peers []string

for _, s := range servers {
Expand All @@ -156,30 +177,56 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
s.mu.RUnlock()
}

return peers
}

func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string, smf smFactory) stateMachine {
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(c.t, err)
sm := smf(s, cfg, n)
go smLoop(sm)
return sm
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
c.t.Helper()

var sg smGroup
peers := serverPeerNames(servers)

for _, s := range servers {
var cfg *RaftConfig
if st == FileStorage {
fs, err := newFileStore(
FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
StreamConfig{Name: name, Storage: FileStorage},
)
require_NoError(c.t, err)
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
} else {
ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage})
require_NoError(c.t, err)
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: ms}
}
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(c.t, err)
sm := smf(s, cfg, n)
sg = append(sg, sm)
go smLoop(sm)
cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
}
return sg
}

func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
c.t.Helper()

server := c.addInNewServer()

cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}

peers := serverPeerNames(c.servers)
return c.createStateMachine(server, cfg, peers, smf)
}

func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, FileStorage)
}

func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, MemoryStorage)
}

// Driver program for the state machine.
// Should be run in its own go routine.
func smLoop(sm stateMachine) {
Expand Down
100 changes: 99 additions & 1 deletion server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,11 @@ func TestNRGQuorumAccounting(t *testing.T) {
nats1 := "yrzKKRBu" // "nats-1"
nats2 := "cnrtt3eg" // "nats-2"

n.Lock()
n.addPeer(nats1)
n.addPeer(nats2)
n.Unlock()

// Timeline
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
Expand Down Expand Up @@ -2283,6 +2288,11 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
nats1 := "yrzKKRBu" // "nats-1"
nats2 := "cnrtt3eg" // "nats-2"

n.Lock()
n.addPeer(nats1)
n.addPeer(nats2)
n.Unlock()

// Timeline
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
Expand Down Expand Up @@ -3648,6 +3658,8 @@ func TestNRGLostQuorum(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

nats0 := "S1Nunr6R" // "nats-0"

require_Equal(t, n.State(), Follower)
require_False(t, n.Quorum())
require_True(t, n.lostQuorum())
Expand All @@ -3659,7 +3671,7 @@ func TestNRGLostQuorum(t *testing.T) {
// Respond to a vote request.
sub, err := nc.Subscribe(n.vsubj, func(m *nats.Msg) {
req := decodeVoteRequest(m.Data, m.Reply)
resp := voteResponse{term: req.term, peer: "random", granted: true}
resp := voteResponse{term: req.term, peer: nats0, granted: true}
m.Respond(resp.encode())
})
require_NoError(t, err)
Expand All @@ -3672,6 +3684,10 @@ func TestNRGLostQuorum(t *testing.T) {
require_False(t, n.Quorum())
require_True(t, n.lostQuorum())

n.Lock()
n.addPeer(nats0)
n.Unlock()

n.runAsCandidate()
require_Equal(t, n.State(), Leader)
require_True(t, n.Quorum())
Expand Down Expand Up @@ -3816,6 +3832,11 @@ func TestNRGQuorumAfterLeaderStepdown(t *testing.T) {
nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

n.Lock()
n.addPeer(nats0)
n.addPeer(nats1)
n.Unlock()

// Become leader.
n.switchToCandidate()
n.switchToLeader()
Expand Down Expand Up @@ -4411,3 +4432,80 @@ func TestNRGLeaderResurrectsRemovedPeers(t *testing.T) {
followers[1].restart()
require_Equal(t, len(leader.node().Peers()), 2)
}

func TestNRGAddPeers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
leader := rg.waitOnLeader()

require_Equal(t, leader.node().ClusterSize(), 3)

for range 6 {
rg = append(rg, c.addMemRaftNode("TEST", newStateAdder))
}

checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if leader.node().ClusterSize() != 9 {
return errors.New("node additions still in progress")
}
return nil
})

require_Equal(t, leader.node().ClusterSize(), 9)
}

func TestNRGDisjointMajorities(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

leader := rg.leader()
followers := rg.followers()
require_Equal(t, len(followers), 2)

// Lock all followers, a majority.
locked := rg.lockFollowers()
require_Equal(t, len(locked), 2)

defer func() {
for _, l := range locked {
l.node().(*raft).Unlock()
}
}()

// Add one node (cluster size is 4)
c.addMemRaftNode("TEST", newStateAdder)

checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if leader.node().ClusterSize() != 4 {
return errors.New("node addition still in progress")
}
return nil
})

// Attempt another node addition
c.addMemRaftNode("TEST", newStateAdder)

// If bug is present:
// The leader is able to form a majority because it would
// add peers immediately, and readjust cluster size and
// quorum only after committing EntryAddPeer. This allowed
// the leader to commit the entries using only newly added
// nodes (the original followers are locked and not
// acknowledging the EntryAddPeer proposals!)
// This should not never happen. In a real scenario the
// followers could be partitioned, and they actually have
// a majority... so they could diverge and we end up with
// two different histories.
//
// Here wait a little bit, and check that the leader is
// unable to make any progess.
time.Sleep(time.Second)

require_Equal(t, leader.node().ClusterSize(), 4)
require_Equal(t, leader.node().MembershipChangeInProgress(), true)
}