Skip to content

Commit 1823f69

Browse files
[FIXED] Filestore uses SkipMsgs for recovered trailing deletes
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 9d3b2b4 commit 1823f69

File tree

2 files changed

+69
-17
lines changed

2 files changed

+69
-17
lines changed

server/filestore.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -512,19 +512,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
512512
// Check if our prior state remembers a last sequence past where we can see.
513513
// Unless we're async flushing, in which case this can happen if some blocks weren't flushed.
514514
if prior.LastSeq > fs.state.LastSeq && !fs.fcfg.AsyncFlush {
515-
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
516-
if fs.state.Msgs == 0 {
517-
fs.state.FirstSeq = fs.state.LastSeq + 1
518-
fs.state.FirstTime = time.Time{}
519-
}
520-
if fs.ld != nil {
521-
if _, err := fs.newMsgBlockForWrite(); err == nil {
522-
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
523-
return nil, err
524-
}
525-
} else {
526-
return nil, err
527-
}
515+
if err = fs.skipMsgsLocked(fs.state.LastSeq+1, prior.LastSeq-fs.state.LastSeq, prior.LastTime.UnixNano()); err != nil {
516+
return nil, err
528517
}
529518
}
530519
// Since we recovered here, make sure to kick ourselves to write out our stream state.
@@ -4852,7 +4841,11 @@ func (fs *fileStore) SkipMsg(seq uint64) (uint64, error) {
48524841
func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
48534842
fs.mu.Lock()
48544843
defer fs.mu.Unlock()
4844+
now := ats.AccessTime()
4845+
return fs.skipMsgsLocked(seq, num, now)
4846+
}
48554847

4848+
func (fs *fileStore) skipMsgsLocked(seq uint64, num uint64, ts int64) error {
48564849
// Check sequence matches our last sequence.
48574850
if seq != fs.state.LastSeq+1 {
48584851
if seq > 0 {
@@ -4877,14 +4870,13 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
48774870
}
48784871

48794872
// Insert into dmap all entries and place last as marker.
4880-
now := ats.AccessTime()
48814873
lseq := seq + num - 1
48824874

48834875
mb.mu.Lock()
48844876
// If we are empty update meta directly.
48854877
if mb.msgs == 0 {
48864878
atomic.StoreUint64(&mb.last.seq, lseq)
4887-
mb.last.ts = now
4879+
mb.last.ts = ts
48884880
atomic.StoreUint64(&mb.first.seq, lseq+1)
48894881
mb.first.ts = 0
48904882
} else {
@@ -4893,12 +4885,12 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
48934885
}
48944886
}
48954887
// Write out our placeholder.
4896-
mb.writeMsgRecordLocked(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true, true)
4888+
mb.writeMsgRecordLocked(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, ts, true, true)
48974889
mb.mu.Unlock()
48984890

48994891
// Now update FS accounting.
49004892
// Update fs state.
4901-
fs.state.LastSeq, fs.state.LastTime = lseq, time.Unix(0, now).UTC()
4893+
fs.state.LastSeq, fs.state.LastTime = lseq, time.Unix(0, ts).UTC()
49024894
if fs.state.Msgs == 0 {
49034895
fs.state.FirstSeq, fs.state.FirstTime = lseq+1, time.Time{}
49044896
}

server/filestore_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12657,3 +12657,63 @@ func TestFileStoreDeleteBlocksWithManyEmptyBlocks(t *testing.T) {
1265712657
&DeleteRange{First: 2, Num: 13},
1265812658
})
1265912659
}
12660+
12661+
func TestFileStoreTrailingSkipMsgsFromStreamStateFile(t *testing.T) {
12662+
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
12663+
cfg := StreamConfig{Name: "zzz", Storage: FileStorage, Subjects: []string{">"}}
12664+
created := time.Now()
12665+
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
12666+
require_NoError(t, err)
12667+
defer fs.Stop()
12668+
12669+
for range 10 {
12670+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
12671+
require_NoError(t, err)
12672+
}
12673+
12674+
fs.mu.RLock()
12675+
lmb := fs.lmb
12676+
fs.mu.RUnlock()
12677+
lmb.mu.RLock()
12678+
mfn := lmb.mfn
12679+
lmb.mu.RUnlock()
12680+
require_NotEqual(t, mfn, _EMPTY_)
12681+
12682+
// Stop the store and truncate the block file.
12683+
// Stopping should write out our stream state file, letting us "remember" the highest last sequence.
12684+
require_NoError(t, fs.Stop())
12685+
require_NoError(t, os.Truncate(mfn, 33*5))
12686+
12687+
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
12688+
require_NoError(t, err)
12689+
defer fs.Stop()
12690+
12691+
// We should recover with the right state, the last sequence coming from the stream state file,
12692+
// and the messages from the recovered blocks.
12693+
state := fs.State()
12694+
require_Equal(t, state.Msgs, 5)
12695+
require_Equal(t, state.FirstSeq, 1)
12696+
require_Equal(t, state.LastSeq, 10)
12697+
12698+
// Go through all messages, they should properly report deletes.
12699+
for seq := uint64(1); seq <= 10; seq++ {
12700+
_, err = fs.LoadMsg(seq, nil)
12701+
if seq <= 5 {
12702+
require_NoError(t, err)
12703+
} else {
12704+
require_Error(t, err, errDeletedMsg)
12705+
}
12706+
}
12707+
12708+
// Also check the dmap to be sure.
12709+
fs.mu.RLock()
12710+
lmb = fs.lmb
12711+
lblks := len(fs.blks)
12712+
fs.mu.RUnlock()
12713+
lmb.mu.RLock()
12714+
ldmap := lmb.dmap.Size()
12715+
lmb.mu.RUnlock()
12716+
require_Len(t, lblks, 1)
12717+
require_Len(t, ldmap, 5)
12718+
})
12719+
}

0 commit comments

Comments
 (0)