@@ -2339,7 +2339,7 @@ func (fs *fileStore) recoverMsgs() error {
23392339 for _ , mb := range emptyBlks {
23402340 // Need the mb lock here.
23412341 mb .mu .Lock ()
2342- fs .removeMsgBlock (mb )
2342+ fs .forceRemoveMsgBlock (mb )
23432343 mb .mu .Unlock ()
23442344 }
23452345 }
@@ -4669,7 +4669,7 @@ func (fs *fileStore) rebuildFirst() {
46694669 fmb .mu .RUnlock ()
46704670 if isEmpty {
46714671 fmb .mu .Lock ()
4672- fs .removeMsgBlock (fmb )
4672+ fs .forceRemoveMsgBlock (fmb )
46734673 fmb .mu .Unlock ()
46744674 }
46754675 fs .selectNextFirst ()
@@ -5102,8 +5102,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
51025102 isLastBlock := mb == fs .lmb
51035103 isEmpty := mb .msgs == 0
51045104
5105- // If erase but block is empty, we can simply remove the block later .
5106- if secure && ! isEmpty {
5105+ // Must always perform the erase, even if the block is empty as it could contain tombstones .
5106+ if secure {
51075107 // Grab record info, but use the pre-computed record length.
51085108 ri , _ , _ , err := mb .slotInfo (int (seq - mb .cache .fseq ))
51095109 if err != nil {
@@ -5222,15 +5222,15 @@ func (mb *msgBlock) shouldCompactSync() bool {
52225222// This will compact and rewrite this block. This version will not process any tombstone cleanup.
52235223// Write lock needs to be held.
52245224func (mb * msgBlock ) compact () {
5225- mb .compactWithFloor (0 )
5225+ mb .compactWithFloor (0 , nil )
52265226}
52275227
52285228// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
52295229// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
52305230// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
52315231// if fseq > 0 we will attempt to cleanup stale tombstones.
52325232// Write lock needs to be held.
5233- func (mb * msgBlock ) compactWithFloor (floor uint64 ) {
5233+ func (mb * msgBlock ) compactWithFloor (floor uint64 , fsDmap * avl. SequenceSet ) {
52345234 wasLoaded := mb .cache != nil && mb .cacheAlreadyLoaded ()
52355235 if ! wasLoaded {
52365236 if err := mb .loadMsgsWithLock (); err != nil {
@@ -5275,7 +5275,9 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
52755275 // If this entry is for a lower seq than ours then keep around.
52765276 // We also check that it is greater than our floor. Floor is zero on normal
52775277 // calls to compact.
5278- if seq < fseq && seq >= floor {
5278+ // If the global delete map is set, check if a tombstone is still
5279+ // referencing a message in another block. If not, it can be removed.
5280+ if seq < fseq && seq >= floor && (fsDmap == nil || fsDmap .Exists (seq )) {
52795281 nbuf = append (nbuf , buf [index :index + rl ]... )
52805282 }
52815283 } else {
@@ -6791,6 +6793,9 @@ func (fs *fileStore) syncBlocks() {
67916793 fs .firstMoved = false
67926794 fs .mu .Unlock ()
67936795
6796+ var fsDmapLoaded bool
6797+ var fsDmap avl.SequenceSet
6798+
67946799 var markDirty bool
67956800 for _ , mb := range blks {
67966801 // Do actual sync. Hold lock for consistency.
@@ -6828,9 +6833,16 @@ func (fs *fileStore) syncBlocks() {
68286833 // Check if we should compact here.
68296834 // Need to hold fs lock in case we reference psim when loading in the mb and we may remove this block if truly empty.
68306835 if needsCompact {
6836+ // Load a delete map containing only interior deletes.
6837+ // This is used when compacting to know if tombstones are still relevant,
6838+ // and if not they can be compacted.
6839+ if ! fsDmapLoaded {
6840+ fsDmapLoaded = true
6841+ fsDmap = fs .deleteMap ()
6842+ }
68316843 fs .mu .RLock ()
68326844 mb .mu .Lock ()
6833- mb .compactWithFloor (firstSeq )
6845+ mb .compactWithFloor (firstSeq , & fsDmap )
68346846 // If this compact removed all raw bytes due to tombstone cleanup, schedule to remove.
68356847 shouldRemove := mb .rbytes == 0
68366848 mb .mu .Unlock ()
@@ -9112,6 +9124,55 @@ func (mb *msgBlock) tombsLocked() []msgId {
91129124 return tombs
91139125}
91149126
9127+ // Return number of tombstones for messages prior to this msgBlock.
9128+ // Both locks should be held.
9129+ // Write lock should be held for block.
9130+ func (mb * msgBlock ) numPriorTombsLocked () int {
9131+ if mb .cacheNotLoaded () {
9132+ if err := mb .loadMsgsWithLock (); err != nil {
9133+ return 0
9134+ }
9135+ }
9136+ defer mb .finishedWithCache ()
9137+
9138+ var fseq uint64
9139+ var tombs int
9140+ var le = binary .LittleEndian
9141+ buf := mb .cache .buf
9142+
9143+ for index , lbuf := uint32 (0 ), uint32 (len (buf )); index < lbuf ; {
9144+ if index + msgHdrSize > lbuf {
9145+ return tombs
9146+ }
9147+ hdr := buf [index : index + msgHdrSize ]
9148+ rl , seq := le .Uint32 (hdr [0 :]), le .Uint64 (hdr [4 :])
9149+ // Clear any headers bit that could be set.
9150+ rl &^= hbit
9151+ // Check for tombstones.
9152+ if seq & tbit != 0 {
9153+ seq = seq &^ tbit
9154+ // Tombstones below the global first seq are irrelevant.
9155+ // And we only count tombstones below this block's first seq.
9156+ if seq >= mb .fs .state .FirstSeq && (fseq == 0 || seq < fseq ) {
9157+ tombs ++
9158+ }
9159+ index += rl
9160+ continue
9161+ }
9162+ if seq == 0 || seq & ebit != 0 {
9163+ index += rl
9164+ continue
9165+ }
9166+ // Advance to next record.
9167+ index += rl
9168+ if fseq == 0 {
9169+ fseq = seq
9170+ }
9171+ }
9172+
9173+ return tombs
9174+ }
9175+
91159176// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
91169177func (fs * fileStore ) Truncate (seq uint64 ) error {
91179178 // Check for request to reset.
@@ -9360,7 +9421,10 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
93609421 fs .writeTombstone (lseq , lts )
93619422 }
93629423 // Only delete message block after (potentially) writing a tombstone.
9363- fs .forceRemoveMsgBlock (mb )
9424+ // But only if it doesn't contain any tombstones for prior blocks.
9425+ if mb .numPriorTombsLocked () == 0 {
9426+ fs .forceRemoveMsgBlock (mb )
9427+ }
93649428}
93659429
93669430// Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved.
@@ -10584,7 +10648,18 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
1058410648 // Detect if we have a gap between these blocks.
1058510649 fseq := atomic .LoadUint64 (& mb .first .seq )
1058610650 if prevLast > 0 && prevLast + 1 != fseq {
10587- dbs = append (dbs , & DeleteRange {First : prevLast + 1 , Num : fseq - prevLast - 1 })
10651+ var reuseGap bool
10652+ if len (dbs ) > 0 {
10653+ // Detect multiple blocks that only contain large gaps. We can simply make
10654+ // the previous gap larger to account for these, instead of adding a new range.
10655+ if dr , ok := dbs [len (dbs )- 1 ].(* DeleteRange ); ok {
10656+ dr .Num += fseq - prevLast - 1
10657+ reuseGap = true
10658+ }
10659+ }
10660+ if ! reuseGap {
10661+ dbs = append (dbs , & DeleteRange {First : prevLast + 1 , Num : fseq - prevLast - 1 })
10662+ }
1058810663 }
1058910664 if mb .dmap .Size () > 0 {
1059010665 dbs = append (dbs , & mb .dmap )
@@ -10594,6 +10669,22 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
1059410669 return dbs
1059510670}
1059610671
10672+ // deleteMap returns all interior deletes for each block based on the mb.dmap.
10673+ // Specifically, this will not contain any deletes for blocks that have been removed.
10674+ // This is useful to know whether a tombstone is still relevant and marked as deleted by an active block.
10675+ // All blocks should be at least read locked.
10676+ func (fs * fileStore ) deleteMap () (dmap avl.SequenceSet ) {
10677+ for _ , mb := range fs .blks {
10678+ if mb .dmap .Size () > 0 {
10679+ mb .dmap .Range (func (seq uint64 ) bool {
10680+ dmap .Insert (seq )
10681+ return true
10682+ })
10683+ }
10684+ }
10685+ return dmap
10686+ }
10687+
1059710688// SyncDeleted will make sure this stream has same deleted state as dbs.
1059810689// This will only process deleted state within our current state.
1059910690func (fs * fileStore ) SyncDeleted (dbs DeleteBlocks ) {
0 commit comments