Skip to content

Commit 9f4d960

Browse files
jnmoyneneilalexander
authored andcommitted
Make the deduplication window actually work for deduplication for sourcing
Allows the use of sourcing of multiple streams into a discard new per subject stream for de-duplication. Does not include the 'skip on discard new' part (which will be re-introduced in a different PR) and only has the fixing of the deduplication window for sourcing part. Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
1 parent 304e184 commit 9f4d960

File tree

3 files changed

+323
-17
lines changed

3 files changed

+323
-17
lines changed

server/jetstream_cluster_4_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7636,3 +7636,156 @@ func TestJetStreamClusterConsumerSetStoreStateOldUpdateRestart(t *testing.T) {
76367636
require_NoError(t, err)
76377637
}
76387638
}
7639+
7640+
func TestJetStreamClusterSourcingIntoDiscardNewPerSubject(t *testing.T) {
7641+
c := createJetStreamClusterExplicit(t, "R3S", 3)
7642+
defer c.shutdown()
7643+
7644+
nc, js := jsClientConnect(t, c.randomServer())
7645+
defer nc.Close()
7646+
7647+
_, err := js.AddStream(&nats.StreamConfig{
7648+
Name: "A",
7649+
Subjects: []string{"foo.*"},
7650+
Duplicates: 100 * time.Millisecond,
7651+
Replicas: 3,
7652+
})
7653+
require_NoError(t, err)
7654+
7655+
sConfig := StreamConfig{
7656+
Name: "B",
7657+
Storage: MemoryStorage,
7658+
Sources: []*StreamSource{{Name: "A"}},
7659+
MaxMsgsPer: 1,
7660+
Discard: DiscardNew,
7661+
DiscardNewPer: true,
7662+
Duplicates: 10 * time.Second,
7663+
Replicas: 3,
7664+
}
7665+
7666+
req, err := json.Marshal(&sConfig)
7667+
require_NoError(t, err)
7668+
7669+
r, err := nc.Request("$JS.API.STREAM.CREATE.B", req, 5*time.Second)
7670+
require_NoError(t, err)
7671+
7672+
var resp JSApiStreamCreateResponse
7673+
err = json.Unmarshal(r.Data, &resp)
7674+
require_NoError(t, err)
7675+
require_Equal(t, resp.Error, nil)
7676+
7677+
_, err = js.Publish("foo.1", []byte("1"))
7678+
require_NoError(t, err)
7679+
7680+
// This second publish to the same subject should not be sourced
7681+
// due to discard new per subject with MaxMsgsPer=1.
7682+
_, err = js.Publish("foo.1", []byte("2"))
7683+
require_NoError(t, err)
7684+
7685+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
7686+
si, err := js.StreamInfo("B")
7687+
if err != nil {
7688+
return err
7689+
}
7690+
if si.State.Msgs != 1 {
7691+
return fmt.Errorf("expected 1 message, got %d", si.State.Msgs)
7692+
}
7693+
return nil
7694+
})
7695+
7696+
// Check the message content.
7697+
msgp, err := js.GetMsg("B", uint64(1))
7698+
require_NoError(t, err)
7699+
require_Equal(t, msgp.Subject, "foo.1")
7700+
require_Equal(t, string(msgp.Data), "1")
7701+
7702+
// Purge stream B so sourcing can continue past the per-subject limit.
7703+
require_NoError(t, js.PurgeStream("B"))
7704+
7705+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
7706+
si, err := js.StreamInfo("B")
7707+
if err != nil {
7708+
return err
7709+
}
7710+
if si.State.Msgs != 1 {
7711+
return fmt.Errorf("expected 1 message, got %d", si.State.Msgs)
7712+
}
7713+
return nil
7714+
})
7715+
7716+
// After purge, the second message should now be sourced.
7717+
msgp, err = js.GetMsg("B", uint64(2))
7718+
require_NoError(t, err)
7719+
require_Equal(t, msgp.Subject, "foo.1")
7720+
require_Equal(t, string(msgp.Data), "2")
7721+
7722+
// Test duplicate message ID handling: publish with explicit Nats-Msg-Id.
7723+
msg := nats.NewMsg("foo.2")
7724+
msg.Data = []byte("1")
7725+
msg.Header.Set("Nats-Msg-Id", "dup1")
7726+
7727+
_, err = js.PublishMsg(msg)
7728+
require_NoError(t, err)
7729+
7730+
// Must be able to source the message after the duplicate.
7731+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
7732+
si, err := js.StreamInfo("B")
7733+
if err != nil {
7734+
return err
7735+
}
7736+
if si.State.Msgs != 2 {
7737+
return fmt.Errorf("expected 2 messages, got %d", si.State.Msgs)
7738+
}
7739+
return nil
7740+
})
7741+
7742+
msgp, err = js.GetMsg("B", uint64(3))
7743+
require_NoError(t, err)
7744+
require_Equal(t, msgp.Subject, "foo.2")
7745+
require_Equal(t, string(msgp.Data), "1")
7746+
7747+
// Wait for the dedup window on stream A (100ms) to expire.
7748+
time.Sleep(200 * time.Millisecond)
7749+
7750+
// Publish a message with the same Nats-Msg-Id but to a different subject.
7751+
// Stream A's dedup window has expired, so A will accept it.
7752+
// Stream B has a 10s dedup window, so it should detect this as a duplicate and skip it.
7753+
msg = nats.NewMsg("foo.3")
7754+
msg.Data = []byte("1")
7755+
msg.Header.Set("Nats-Msg-Id", "dup1")
7756+
7757+
_, err = js.PublishMsg(msg)
7758+
require_NoError(t, err)
7759+
7760+
// The duplicate should be skipped by B; message count should stay at 2.
7761+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
7762+
si, err := js.StreamInfo("B")
7763+
if err != nil {
7764+
return err
7765+
}
7766+
if si.State.Msgs != 2 {
7767+
return fmt.Errorf("expected 2 messages, got %d", si.State.Msgs)
7768+
}
7769+
return nil
7770+
})
7771+
7772+
// Sourcing must continue processing after the skipped duplicate.
7773+
_, err = js.Publish("foo.4", []byte("1"))
7774+
require_NoError(t, err)
7775+
7776+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
7777+
si, err := js.StreamInfo("B")
7778+
if err != nil {
7779+
return err
7780+
}
7781+
if si.State.Msgs != 3 {
7782+
return fmt.Errorf("expected 3 messages, got %d", si.State.Msgs)
7783+
}
7784+
return nil
7785+
})
7786+
7787+
msgp, err = js.GetMsg("B", uint64(4))
7788+
require_NoError(t, err)
7789+
require_Equal(t, msgp.Subject, "foo.4")
7790+
require_Equal(t, string(msgp.Data), "1")
7791+
}

server/jetstream_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23119,3 +23119,153 @@ func TestJetStreamMirrorSetupStartGoRoutineFailMissingWgDone(t *testing.T) {
2311923119
t.Fatal("mirror.wg.Wait() blocked indefinitely: missing wg.Done() in startGoRoutine failure path")
2312023120
}
2312123121
}
23122+
23123+
func TestJetStreamSourcingIntoDiscardNewPerSubject(t *testing.T) {
23124+
s := RunBasicJetStreamServer(t)
23125+
defer s.Shutdown()
23126+
23127+
nc, js := jsClientConnect(t, s)
23128+
defer nc.Close()
23129+
23130+
_, err := js.AddStream(&nats.StreamConfig{
23131+
Name: "A",
23132+
Subjects: []string{"foo.*"},
23133+
Duplicates: 100 * time.Millisecond,
23134+
})
23135+
require_NoError(t, err)
23136+
23137+
sConfig := StreamConfig{
23138+
Name: "B",
23139+
Storage: MemoryStorage,
23140+
Sources: []*StreamSource{{Name: "A"}},
23141+
MaxMsgsPer: 1,
23142+
Discard: DiscardNew,
23143+
DiscardNewPer: true,
23144+
Duplicates: 10 * time.Second,
23145+
}
23146+
23147+
req, err := json.Marshal(&sConfig)
23148+
require_NoError(t, err)
23149+
23150+
r, err := nc.Request("$JS.API.STREAM.CREATE.B", req, 5*time.Second)
23151+
require_NoError(t, err)
23152+
23153+
var resp JSApiConsumerCreateResponse
23154+
err = json.Unmarshal(r.Data, &resp)
23155+
require_NoError(t, err)
23156+
require_Equal(t, resp.Error, nil)
23157+
23158+
_, err = js.Publish("foo.1", ([]byte)("1"))
23159+
require_NoError(t, err)
23160+
23161+
// this will not be sourced as discard new per subject
23162+
_, err = js.Publish("foo.1", ([]byte)("2"))
23163+
require_NoError(t, err)
23164+
23165+
var si *nats.StreamInfo
23166+
23167+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
23168+
si, err = js.StreamInfo("B")
23169+
if err != nil {
23170+
return err
23171+
}
23172+
if si.State.Msgs != 1 {
23173+
return fmt.Errorf("expected 1 messages, got %d", si.State.Msgs)
23174+
}
23175+
return nil
23176+
})
23177+
23178+
// Check the message
23179+
msgp, err := js.GetMsg("B", uint64(1))
23180+
require_NoError(t, err)
23181+
require_Equal(t, msgp.Subject, "foo.1")
23182+
require_Equal(t, string(msgp.Data), "1")
23183+
23184+
// now purge the stream so sourcing can continue
23185+
require_NoError(t, js.PurgeStream("B"))
23186+
23187+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
23188+
si, err = js.StreamInfo("B")
23189+
if err != nil {
23190+
return err
23191+
}
23192+
if si.State.Msgs != 1 {
23193+
return fmt.Errorf("expected 1 messages, got %d", si.State.Msgs)
23194+
}
23195+
return nil
23196+
})
23197+
23198+
// check the message
23199+
msgp, err = js.GetMsg("B", uint64(2))
23200+
require_NoError(t, err)
23201+
require_Equal(t, msgp.Subject, "foo.1")
23202+
require_Equal(t, string(msgp.Data), "2")
23203+
23204+
msg := nats.NewMsg("foo.2")
23205+
msg.Data = []byte("1")
23206+
msg.Header.Set("Nats-Msg-Id", "1")
23207+
23208+
_, err = js.PublishMsg(msg)
23209+
require_NoError(t, err)
23210+
23211+
// Must be able to move on and source the next message after the duplicate
23212+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
23213+
si, err = js.StreamInfo("B")
23214+
if err != nil {
23215+
return err
23216+
}
23217+
if si.State.Msgs != 2 {
23218+
return fmt.Errorf("expected 2 messages, got %d", si.State.Msgs)
23219+
}
23220+
return nil
23221+
})
23222+
23223+
// check the message
23224+
msgp, err = js.GetMsg("B", uint64(3))
23225+
require_NoError(t, err)
23226+
require_Equal(t, msgp.Subject, "foo.2")
23227+
require_Equal(t, string(msgp.Data), "1")
23228+
23229+
time.Sleep(200 * time.Millisecond)
23230+
23231+
msg = nats.NewMsg("foo.3")
23232+
msg.Data = []byte("1")
23233+
msg.Header.Set("Nats-Msg-Id", "1")
23234+
23235+
_, err = js.PublishMsg(msg)
23236+
require_NoError(t, err)
23237+
23238+
// Duplicate message id, should get skipped
23239+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
23240+
si, err = js.StreamInfo("B")
23241+
if err != nil {
23242+
return err
23243+
}
23244+
if si.State.Msgs != 2 {
23245+
return fmt.Errorf("expected 2 messages, got %d", si.State.Msgs)
23246+
}
23247+
return nil
23248+
})
23249+
23250+
// Must be able to move on
23251+
_, err = js.Publish("foo.4", ([]byte)("1"))
23252+
require_NoError(t, err)
23253+
23254+
// Must be able to move on and source the next message after the duplicate
23255+
checkFor(t, 4*time.Second, 200*time.Millisecond, func() error {
23256+
si, err = js.StreamInfo("B")
23257+
if err != nil {
23258+
return err
23259+
}
23260+
if si.State.Msgs != 3 {
23261+
return fmt.Errorf("expected 3 messages, got %d", si.State.Msgs)
23262+
}
23263+
return nil
23264+
})
23265+
23266+
msgp, err = js.GetMsg("B", uint64(4))
23267+
require_NoError(t, err)
23268+
require_Equal(t, msgp.Subject, "foo.4")
23269+
require_Equal(t, string(msgp.Data), "1")
23270+
23271+
}

server/stream.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1830,7 +1830,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
18301830
}
18311831
}
18321832

1833-
// check for duplicates
1833+
// check sources for duplicates
18341834
var iNames = make(map[string]struct{})
18351835
for _, src := range cfg.Sources {
18361836
if src == nil || !isValidName(src.Name) {
@@ -3983,7 +3983,6 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
39833983
} else {
39843984
err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0, nil, true, true)
39853985
}
3986-
39873986
if err != nil {
39883987
s := mset.srv
39893988
if strings.Contains(err.Error(), "no space left") {
@@ -3993,31 +3992,35 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
39933992
mset.mu.RLock()
39943993
accName, sname, iName := mset.acc.Name, mset.cfg.Name, si.iname
39953994
mset.mu.RUnlock()
3996-
3997-
// Can happen temporarily all the time during normal operations when the sourcing stream
3998-
// is working queue/interest with a limit and discard new.
3999-
// TODO - Improve sourcing to WQ with limit and new to use flow control rather than re-creating the consumer.
4000-
if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) {
3995+
// Can happen temporarily all the time during normal operations when the sourcing stream is discard new
3996+
// (example use case is for sourcing into a work queue)
3997+
// TODO - Maybe improve sourcing to WQ with limit and new to use flow control rather than re-creating the consumer.
3998+
if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) || errors.Is(err, ErrMaxMsgsPerSubject) {
40013999
// Do not need to do a full retry that includes finding the last sequence in the stream
40024000
// for that source. Just re-create starting with the seq we couldn't store instead.
40034001
mset.mu.Lock()
40044002
mset.retrySourceConsumerAtSeq(iName, si.sseq)
40054003
mset.mu.Unlock()
40064004
} else {
4007-
// Log some warning for errors other than errLastSeqMismatch or errMaxMsgs.
4008-
if !errors.Is(err, errLastSeqMismatch) {
4005+
// Log some warning for errors other than errLastSeqMismatch.
4006+
if !errors.Is(err, errLastSeqMismatch) && !errors.Is(err, errMsgIdDuplicate) {
40094007
s.RateLimitWarnf("Error processing inbound source %q for '%s' > '%s': %v",
40104008
iName, accName, sname, err)
40114009
}
4012-
// Retry in all type of errors if we are still leader.
4010+
// Retry in all type of errors we do not want to skip if we are still leader.
40134011
if mset.isLeader() {
4014-
// This will make sure the source is still in mset.sources map,
4015-
// find the last sequence and then call setupSourceConsumer.
4016-
iNameMap := map[string]struct{}{iName: {}}
4017-
mset.setStartingSequenceForSources(iNameMap)
4018-
mset.mu.Lock()
4019-
mset.retrySourceConsumerAtSeq(iName, si.sseq+1)
4020-
mset.mu.Unlock()
4012+
if !errors.Is(err, errMsgIdDuplicate) {
4013+
// This will make sure the source is still in mset.sources map,
4014+
// find the last sequence and then call setupSourceConsumer.
4015+
iNameMap := map[string]struct{}{iName: {}}
4016+
mset.setStartingSequenceForSources(iNameMap)
4017+
mset.mu.Lock()
4018+
mset.retrySourceConsumerAtSeq(iName, si.sseq+1)
4019+
mset.mu.Unlock()
4020+
} else {
4021+
// skipping the message but keep processing the rest of the batch
4022+
return true
4023+
}
40214024
}
40224025
}
40234026
}

0 commit comments

Comments
 (0)