Skip to content

Commit bf5a10c

Browse files
[FIXED] Filestore always use tombstone for recovered trailing deletes (#7782)
When restoring from a stale `index.db` stream state file, we still remember the last sequence that was used. If this last sequence was higher than is currently on disk, we move the recovered lower last sequence up to the highest seen last sequence. However, the last sequence and timestamp were adjusted even if no tombstone was actually written. Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents 019544d + 77159a8 commit bf5a10c

File tree

2 files changed

+67
-9
lines changed

2 files changed

+67
-9
lines changed

server/filestore.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -512,20 +512,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
512512
// Check if our prior state remembers a last sequence past where we can see.
513513
// Unless we're async flushing, in which case this can happen if some blocks weren't flushed.
514514
if prior.LastSeq > fs.state.LastSeq && !fs.fcfg.AsyncFlush {
515+
if mb, err := fs.newMsgBlockForWrite(); err != nil {
516+
return nil, err
517+
} else if err = mb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
518+
return nil, err
519+
}
515520
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
516521
if fs.state.Msgs == 0 {
517522
fs.state.FirstSeq = fs.state.LastSeq + 1
518523
fs.state.FirstTime = time.Time{}
519524
}
520-
if fs.ld != nil {
521-
if _, err := fs.newMsgBlockForWrite(); err == nil {
522-
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
523-
return nil, err
524-
}
525-
} else {
526-
return nil, err
527-
}
528-
}
529525
}
530526
// Since we recovered here, make sure to kick ourselves to write out our stream state.
531527
fs.dirty++

server/filestore_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12657,3 +12657,65 @@ func TestFileStoreDeleteBlocksWithManyEmptyBlocks(t *testing.T) {
1265712657
&DeleteRange{First: 2, Num: 13},
1265812658
})
1265912659
}
12660+
12661+
func TestFileStoreTrailingSkipMsgsFromStreamStateFile(t *testing.T) {
12662+
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
12663+
cfg := StreamConfig{Name: "zzz", Storage: FileStorage, Subjects: []string{">"}}
12664+
created := time.Now()
12665+
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
12666+
require_NoError(t, err)
12667+
defer fs.Stop()
12668+
12669+
for range 10 {
12670+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
12671+
require_NoError(t, err)
12672+
}
12673+
12674+
fs.mu.RLock()
12675+
lmb := fs.lmb
12676+
fs.mu.RUnlock()
12677+
lmb.mu.RLock()
12678+
mfn := lmb.mfn
12679+
lmb.mu.RUnlock()
12680+
require_NotEqual(t, mfn, _EMPTY_)
12681+
12682+
// Stop the store and truncate the block file.
12683+
// Stopping should write out our stream state file, letting us "remember" the highest last sequence.
12684+
require_NoError(t, fs.Stop())
12685+
require_NoError(t, os.Truncate(mfn, 33*5))
12686+
12687+
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
12688+
require_NoError(t, err)
12689+
defer fs.Stop()
12690+
12691+
// We should recover with the right state, the last sequence coming from the stream state file,
12692+
// and the messages from the recovered blocks.
12693+
state := fs.State()
12694+
require_Equal(t, state.Msgs, 5)
12695+
require_Equal(t, state.FirstSeq, 1)
12696+
require_Equal(t, state.LastSeq, 10)
12697+
12698+
// Go through all messages, they should properly report deletes.
12699+
for seq := uint64(1); seq <= 10; seq++ {
12700+
_, err = fs.LoadMsg(seq, nil)
12701+
if seq <= 5 {
12702+
require_NoError(t, err)
12703+
} else {
12704+
require_Error(t, err, ErrStoreMsgNotFound)
12705+
}
12706+
}
12707+
12708+
// Also check a new block was created to represent this.
12709+
fs.mu.RLock()
12710+
lmb = fs.lmb
12711+
lblks := len(fs.blks)
12712+
fs.mu.RUnlock()
12713+
lmb.mu.RLock()
12714+
ldmap := lmb.dmap.Size()
12715+
lmb.mu.RUnlock()
12716+
require_Len(t, lblks, 2)
12717+
require_Len(t, ldmap, 0)
12718+
require_Equal(t, atomic.LoadUint64(&lmb.first.seq), 11)
12719+
require_Equal(t, atomic.LoadUint64(&lmb.last.seq), 10)
12720+
})
12721+
}

0 commit comments

Comments
 (0)