@@ -5144,21 +5144,42 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
51445144 return false , nil
51455145 }
51465146
5147+ fifo := seq == atomic .LoadUint64 (& mb .first .seq )
5148+ isLastBlock := mb == fs .lmb
5149+ isEmpty := mb .msgs == 1 // ... about to be zero though.
5150+
51475151 // We used to not have to load in the messages except with callbacks or the filtered subject state (which is now always on).
51485152 // Now just load regardless.
51495153 // TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block.
5154+ var didLoad bool
51505155 if mb .cacheNotLoaded () {
51515156 if err := mb .loadMsgsWithLock (); err != nil {
51525157 mb .mu .Unlock ()
51535158 fsUnlock ()
51545159 return false , err
51555160 }
5161+ didLoad = true
5162+ }
5163+ finishedWithCache := func () {
5164+ if didLoad {
5165+ mb .finishedWithCache ()
5166+ }
51565167 }
51575168
51585169 var smv StoreMsg
5159- sm , err := mb .cacheLookupNoCopy (seq , & smv )
5170+ var sm * StoreMsg
5171+ var err error
5172+ if secure {
5173+ // For a secure erase we can't use NoCopy, as eraseMsg will overwrite the
5174+ // cache and we won't be able to access sm.subj etc anymore later on.
5175+ sm , err = mb .cacheLookup (seq , & smv )
5176+ } else {
5177+ // For a non-secure erase it's fine to use NoCopy, as the cache won't change
5178+ // from underneath us.
5179+ sm , err = mb .cacheLookupNoCopy (seq , & smv )
5180+ }
51605181 if err != nil {
5161- mb . finishedWithCache ()
5182+ finishedWithCache ()
51625183 mb .mu .Unlock ()
51635184 fsUnlock ()
51645185 // Mimic err behavior from above check to dmap. No error returned if already removed.
@@ -5167,12 +5188,51 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
51675188 }
51685189 return false , err
51695190 }
5191+
5192+ // Check if we need to write a deleted record tombstone.
5193+ // This is for user initiated removes or to hold the first seq
5194+ // when the last block is empty.
5195+ // If not via limits and not empty (empty writes tombstone above if last) write tombstone.
5196+ if ! viaLimits && ! isEmpty && sm != nil {
5197+ mb .mu .Unlock () // Only safe way to checkLastBlock is to unlock here...
5198+ lmb , err := fs .checkLastBlock (emptyRecordLen )
5199+ if err != nil {
5200+ finishedWithCache ()
5201+ fsUnlock ()
5202+ return false , err
5203+ }
5204+ if err := lmb .writeTombstone (sm .seq , sm .ts ); err != nil {
5205+ finishedWithCache ()
5206+ fsUnlock ()
5207+ return false , err
5208+ }
5209+ mb .mu .Lock () // We'll need the lock back to carry on safely.
5210+ }
5211+
51705212 // Grab size
51715213 msz := fileStoreMsgSize (sm .subj , sm .hdr , sm .msg )
51725214
51735215 // Set cache timestamp for last remove.
51745216 mb .lrts = ats .AccessTime ()
51755217
5218+ // Must always perform the erase, even if the block is empty as it could contain tombstones.
5219+ if secure {
5220+ // Grab record info, but use the pre-computed record length.
5221+ ri , _ , _ , err := mb .slotInfo (int (seq - mb .cache .fseq ))
5222+ if err != nil {
5223+ finishedWithCache ()
5224+ mb .mu .Unlock ()
5225+ fsUnlock ()
5226+ return false , err
5227+ }
5228+ if err := mb .eraseMsg (seq , int (ri ), int (msz ), isLastBlock ); err != nil {
5229+ finishedWithCache ()
5230+ mb .mu .Unlock ()
5231+ fsUnlock ()
5232+ return false , err
5233+ }
5234+ }
5235+
51765236 // Global stats
51775237 if fs .state .Msgs > 0 {
51785238 fs .state .Msgs --
@@ -5212,28 +5272,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
52125272 }
52135273 }
52145274
5215- fifo := seq == atomic .LoadUint64 (& mb .first .seq )
5216- isLastBlock := mb == fs .lmb
5217- isEmpty := mb .msgs == 0
5218-
5219- // If erase but block is empty, we can simply remove the block later.
5220- if secure && ! isEmpty {
5221- // Grab record info, but use the pre-computed record length.
5222- ri , _ , _ , err := mb .slotInfo (int (seq - mb .cache .fseq ))
5223- if err != nil {
5224- mb .finishedWithCache ()
5225- mb .mu .Unlock ()
5226- fsUnlock ()
5227- return false , err
5228- }
5229- if err := mb .eraseMsg (seq , int (ri ), int (msz ), isLastBlock ); err != nil {
5230- mb .finishedWithCache ()
5231- mb .mu .Unlock ()
5232- fsUnlock ()
5233- return false , err
5234- }
5235- }
5236-
52375275 if fifo {
52385276 mb .selectNextFirst ()
52395277 if ! isEmpty {
@@ -5274,7 +5312,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
52745312 fs .removeMsgBlock (mb )
52755313 firstSeqNeedsUpdate = seq == fs .state .FirstSeq
52765314 }
5277- mb . finishedWithCache ()
5315+ finishedWithCache ()
52785316 mb .mu .Unlock ()
52795317
52805318 // If we emptied the current message block and the seq was state.FirstSeq
@@ -5284,15 +5322,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
52845322 fs .selectNextFirst ()
52855323 }
52865324
5287- // Check if we need to write a deleted record tombstone.
5288- // This is for user initiated removes or to hold the first seq
5289- // when the last block is empty.
5290-
5291- // If not via limits and not empty (empty writes tombstone above if last) write tombstone.
5292- if ! viaLimits && ! isEmpty && sm != nil {
5293- fs .writeTombstone (sm .seq , sm .ts )
5294- }
5295-
52965325 if cb := fs .scb ; cb != nil {
52975326 // If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
52985327 fs .mu .Unlock ()
@@ -6914,10 +6943,9 @@ func (fs *fileStore) syncBlocks() {
69146943 continue
69156944 }
69166945 // See if we can close FDs due to being idle.
6917- if mb .mfd != nil && mb .sinceLastWriteActivity () > closeFDsIdle {
6946+ if mb .mfd != nil && mb .sinceLastWriteActivity () > closeFDsIdle && mb . pendingWriteSizeLocked () == 0 {
69186947 mb .dirtyCloseWithRemove (false )
69196948 }
6920-
69216949 // If our first has moved and we are set to noCompact (which is from tombstones),
69226950 // clear so that we might cleanup tombstones.
69236951 if firstMoved && mb .noCompact {
@@ -6931,12 +6959,10 @@ func (fs *fileStore) syncBlocks() {
69316959 markDirty = true
69326960 }
69336961
6962+ // Flush anything that may be pending.
6963+ mb .flushPendingMsgsLocked ()
69346964 // Check if we need to sync. We will not hold lock during actual sync.
69356965 needSync := mb .needSync
6936- if needSync {
6937- // Flush anything that may be pending.
6938- mb .flushPendingMsgsLocked ()
6939- }
69406966 mb .mu .Unlock ()
69416967
69426968 // Check if we should compact here.
@@ -7314,8 +7340,10 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
73147340 // Signals us that we need to rebuild filestore state.
73157341 var fsLostData * LostStreamData
73167342
7343+ var weakenCache bool
73177344 if mb .cache == nil {
73187345 mb .cache = mb .ecache .Value ()
7346+ weakenCache = mb .cache != nil
73197347 }
73207348 if mb .cache == nil || mb .mfd == nil {
73217349 return nil , errNoCache
@@ -7398,8 +7426,10 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
73987426 // Check last access time. If we think the block still has read interest
73997427 // then we will weaken the pointer but otherwise try to hold onto it.
74007428 if ts := ats .AccessTime (); ts < mb .llts || (ts - mb .llts ) <= int64 (mb .cexp ) {
7401- mb .cache = nil
7402- mb .ecache .Weaken ()
7429+ if weakenCache {
7430+ mb .cache = nil
7431+ mb .ecache .Weaken ()
7432+ }
74037433 mb .resetCacheExpireTimer (0 )
74047434 return fsLostData , mb .werr
74057435 }
@@ -9526,6 +9556,36 @@ func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) {
95269556// Lock should be held.
95279557func (fs * fileStore ) purgeMsgBlock (mb * msgBlock ) {
95289558 mb .mu .Lock ()
9559+ // Adjust per-subject tracking if present.
9560+ if err := mb .ensurePerSubjectInfoLoaded (); err == nil && mb .fss != nil {
9561+ mb .fss .IterFast (func (bsubj []byte , ss * SimpleState ) bool {
9562+ subj := bytesToString (bsubj )
9563+ for range ss .Msgs {
9564+ fs .removePerSubject (subj )
9565+ }
9566+ return true
9567+ })
9568+ }
9569+ // Clean up scheduled message metadata if we know this block contained any.
9570+ if fs .scheduling != nil && mb .schedules > 0 {
9571+ cacheLoaded := ! mb .cacheNotLoaded ()
9572+ if ! cacheLoaded {
9573+ cacheLoaded = mb .loadMsgsWithLock () == nil
9574+ }
9575+ if cacheLoaded {
9576+ var smv StoreMsg
9577+ fseq , lseq := atomic .LoadUint64 (& mb .first .seq ), atomic .LoadUint64 (& mb .last .seq )
9578+ for seq := fseq ; seq <= lseq ; seq ++ {
9579+ sm , err := mb .cacheLookupNoCopy (seq , & smv )
9580+ if err != nil || sm == nil {
9581+ continue
9582+ }
9583+ if schedule , ok := getMessageSchedule (sm .hdr ); ok && ! schedule .IsZero () {
9584+ fs .scheduling .remove (seq )
9585+ }
9586+ }
9587+ }
9588+ }
95299589 // Update top level accounting.
95309590 msgs , bytes := mb .msgs , mb .bytes
95319591 if msgs > fs .state .Msgs {
@@ -9537,6 +9597,8 @@ func (fs *fileStore) purgeMsgBlock(mb *msgBlock) {
95379597 fs .state .Msgs -= msgs
95389598 fs .state .Bytes -= bytes
95399599 fs .removeMsgBlock (mb )
9600+ mb .tryForceExpireCacheLocked ()
9601+ mb .finishedWithCache ()
95409602 mb .mu .Unlock ()
95419603 fs .selectNextFirst ()
95429604}
0 commit comments