Skip to content

Commit e6b8cd0

Browse files
[IMPROVED] Interest desync after consumer create/update (#7440)
2 parents d50748c + d292abe commit e6b8cd0

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

server/jetstream_cluster_1_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10447,6 +10447,122 @@ func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) {
1044710447
}
1044810448
}
1044910449

10450+
func TestJetStreamClusterNoInterestDesyncOnConsumerCreate(t *testing.T) {
10451+
c := createJetStreamClusterExplicit(t, "R3S", 3)
10452+
defer c.shutdown()
10453+
10454+
nc, js := jsClientConnect(t, c.randomServer())
10455+
defer nc.Close()
10456+
10457+
_, err := js.AddStream(&nats.StreamConfig{
10458+
Name: "TEST",
10459+
Subjects: []string{"foo"},
10460+
Replicas: 3,
10461+
Retention: nats.InterestPolicy,
10462+
})
10463+
require_NoError(t, err)
10464+
10465+
// Pick a random server that will not know about the new consumer being created.
10466+
// If servers determine "no interest" individually, these servers will desync.
10467+
rs := c.randomNonLeader()
10468+
sjs := rs.getJetStream()
10469+
meta := sjs.getMetaGroup()
10470+
require_NoError(t, meta.PauseApply())
10471+
10472+
sub, err := js.PullSubscribe(_EMPTY_, "DURABLE", nats.BindStream("TEST"))
10473+
require_NoError(t, err)
10474+
defer sub.Drain()
10475+
10476+
checkConsumersAssigned := func(expected int) {
10477+
t.Helper()
10478+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10479+
var count int
10480+
for _, s := range c.servers {
10481+
_, _, jsa := s.globalAccount().getJetStreamFromAccount()
10482+
if jsa.consumerAssigned("TEST", "DURABLE") {
10483+
count++
10484+
}
10485+
}
10486+
if count != expected {
10487+
return fmt.Errorf("expected %d, got %d", expected, count)
10488+
}
10489+
return nil
10490+
})
10491+
}
10492+
// Confirm only two servers know about the consumer.
10493+
checkConsumersAssigned(2)
10494+
c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE")
10495+
10496+
// Publish a single message. All servers will receive this, but only two will store it.
10497+
_, err = js.Publish("foo", nil)
10498+
require_NoError(t, err)
10499+
checkLastSeq := func(lseq uint64) {
10500+
t.Helper()
10501+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10502+
for _, s := range c.servers {
10503+
mset, err := s.globalAccount().lookupStream("TEST")
10504+
if err != nil {
10505+
return err
10506+
}
10507+
if seq := mset.lastSeq(); seq != lseq {
10508+
return fmt.Errorf("expected %d, got %d", lseq, seq)
10509+
}
10510+
}
10511+
return nil
10512+
})
10513+
}
10514+
checkLastSeq(1)
10515+
10516+
// Resume the meta layer such that the consumer gets created on the remaining server.
10517+
meta.ResumeApply()
10518+
checkConsumersAssigned(3)
10519+
10520+
// All servers will now store another published message.
10521+
_, err = js.Publish("foo", nil)
10522+
require_NoError(t, err)
10523+
checkLastSeq(2)
10524+
10525+
// Make sure the consumer leader is on the same server that didn't store the first message.
10526+
cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE")
10527+
if cl != rs {
10528+
mset, err := cl.globalAccount().lookupStream("TEST")
10529+
require_NoError(t, err)
10530+
o := mset.lookupConsumer("DURABLE")
10531+
require_NotNil(t, o)
10532+
n := o.raftNode()
10533+
require_NoError(t, n.StepDown(rs.NodeName()))
10534+
c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE")
10535+
cl = c.consumerLeader(globalAccountName, "TEST", "DURABLE")
10536+
require_Equal(t, cl, rs)
10537+
}
10538+
10539+
// Since the consumer leader is the same as the server that didn't store the first message,
10540+
// it can only receive and ack the second message.
10541+
msgs, err := sub.Fetch(1, nats.MaxWait(time.Second))
10542+
require_NoError(t, err)
10543+
require_Len(t, len(msgs), 1)
10544+
metadata, err := msgs[0].Metadata()
10545+
require_NoError(t, err)
10546+
require_Equal(t, metadata.Sequence.Stream, 2)
10547+
require_Equal(t, metadata.NumPending, 0)
10548+
require_NoError(t, msgs[0].AckSync())
10549+
10550+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10551+
// The servers will eventually be synced up again, but this relies on the interest state being checked.
10552+
for _, s := range c.servers {
10553+
if s == rs {
10554+
continue
10555+
}
10556+
mset, err := s.globalAccount().lookupStream("TEST")
10557+
if err != nil {
10558+
return err
10559+
}
10560+
mset.checkInterestState()
10561+
}
10562+
return checkState(t, c, globalAccountName, "TEST")
10563+
})
10564+
}
10565+
1045010566
//
1045110567
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
1045210568
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/stream.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7607,7 +7607,19 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) bool {
76077607
// Only propose message deletion to the stream if we're consumer leader, otherwise all followers would also propose.
76087608
// We must be the consumer leader, since we know for sure we've stored the message and don't register as pre-ack.
76097609
if o != nil && !o.IsLeader() {
7610+
// Currently, interest-based streams can race on "no interest" because consumer creates/updates go over
7611+
// the meta layer and published messages go over the stream layer. Some servers could then either store
7612+
// or not store some initial set of messages that gained new interest. To get the stream back in sync,
7613+
// we allow moving the first sequence up.
7614+
// TODO(mvv): later on only the stream leader should determine "no interest"
7615+
interestRaiseFirst := mset.cfg.Retention == InterestPolicy && seq == state.FirstSeq
76107616
mset.mu.Unlock()
7617+
if interestRaiseFirst {
7618+
if _, err := store.RemoveMsg(seq); err == ErrStoreEOF {
7619+
// This should not happen, but being pedantic.
7620+
mset.registerPreAckLock(o, seq)
7621+
}
7622+
}
76117623
// Must still mark as removal if follower. If we become leader later, we must be able to retry the proposal.
76127624
return true
76137625
}

0 commit comments

Comments
 (0)