Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* [BUGFIX] live-store: fixed unsuccessful deregistering from membership/partition rings during shutdown [#6848](https://github.com/grafana/tempo/pull/6848) (@zhxiaogg)
* [BUGFIX] fix: respect context cancellation when reading WAL block iterator [#6928](https://github.com/grafana/tempo/pull/6928) (@zhxiaogg)
* [BUGFIX] Complete lifecycler shutdown on errors [#6906](https://github.com/grafana/tempo/pull/6906) (@javiermolinar)
* [BUGFIX] livestore: fix concurrent WAL writes from periodic and shutdown flushes [#6972](https://github.com/grafana/tempo/pull/6972) (@zhxiaogg)

### 3.0 Cleanup

Expand Down
9 changes: 9 additions & 0 deletions modules/livestore/live_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type LiveStore struct {
lagCancel context.CancelFunc
readyErr atomic.Pointer[error] // nil when ready to serve queries
lastRecordTimeNanos atomic.Int64 // stores timestamp of last consumed record as UnixNano, -1 means not set

cutToWalStop chan struct{} // closed to stop perTenantCutToWalLoop goroutines before shutdown flush
cutToWalWg sync.WaitGroup // tracks active perTenantCutToWalLoop goroutines
}

func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher completeBlockFlusher, logger log.Logger, reg prometheus.Registerer) (*LiveStore, error) {
Expand Down Expand Up @@ -180,6 +183,7 @@ func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher
completeBlockLifecycle: completeBlockLifecycle,
completeQueues: flushqueues.New[*completeOp](metricCompleteQueueLength),
startupComplete: make(chan struct{}),
cutToWalStop: make(chan struct{}),
}

// Initialize ready state to starting
Expand Down Expand Up @@ -451,6 +455,11 @@ func (s *LiveStore) stopping(error) error {
stopErr = errors.Join(stopErr, err)
}

level.Info(s.logger).Log("msg", "stopping periodic WAL flush goroutines")
close(s.cutToWalStop)
s.cutToWalWg.Wait()
level.Info(s.logger).Log("msg", "periodic WAL flush goroutines stopped")
Comment thread
zhxiaogg marked this conversation as resolved.

// Flush all data to disk.
level.Info(s.logger).Log("msg", "cutting all instances to WAL")
s.cutAllInstancesToWal()
Expand Down
5 changes: 4 additions & 1 deletion modules/livestore/live_store_background.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (s *LiveStore) retryCompleteOp(op *completeOp, span oteltrace.Span, msg str
}

func (s *LiveStore) perTenantCutToWalLoop(instance *instance) {
s.cutToWalWg.Add(1)
Comment thread
zhxiaogg marked this conversation as resolved.
defer s.cutToWalWg.Done()

// ticker
ticker := time.NewTicker(s.cfg.InstanceFlushPeriod)
defer ticker.Stop()
Expand All @@ -184,7 +187,7 @@ func (s *LiveStore) perTenantCutToWalLoop(instance *instance) {
select {
case <-ticker.C:
s.cutOneInstanceToWal(s.ctx, instance, false)
case <-s.ctx.Done():
case <-s.cutToWalStop:
return
Comment thread
zhxiaogg marked this conversation as resolved.
}
}
Expand Down
15 changes: 8 additions & 7 deletions modules/livestore/live_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func TestLiveStorePushBytesRejectsWhenStopping(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, liveStore)

// Transition to stopping state, then verify writes are rejected.
_ = liveStore.stopping(nil)
// Stop the service so writes are rejected.
err = services.StopAndAwaitTerminated(context.Background(), liveStore)
Comment thread
zhxiaogg marked this conversation as resolved.
require.NoError(t, err)
Comment thread
zhxiaogg marked this conversation as resolved.

id := test.ValidTraceID(nil)
expectedTrace := test.MakeTrace(1, id)
Expand Down Expand Up @@ -700,7 +701,7 @@ func TestLiveStoreShutdownWithPendingCompletions(t *testing.T) {
requireTraceInLiveStore(t, liveStore, expectedID, expectedTrace)
requireInstanceState(t, inst, instanceState{liveTraces: 1, walBlocks: 0, completeBlocks: 0})

require.NoError(t, liveStore.stopping(nil))
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), liveStore))
Comment thread
zhxiaogg marked this conversation as resolved.
}

func TestLiveStoreQueryMethodsBeforeStarted(t *testing.T) {
Expand Down Expand Up @@ -843,8 +844,8 @@ func TestLiveStoreQueryMethodsAfterStopping(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, liveStore)

// Error expected from Kafka reader shutdown; we only care about query behavior after stopping begins.
_ = liveStore.stopping(nil)
// Stop the service so queries are rejected.
_ = services.StopAndAwaitTerminated(context.Background(), liveStore)
Comment thread
zhxiaogg marked this conversation as resolved.

ctx := user.InjectOrgID(context.Background(), testTenantID)

Expand Down Expand Up @@ -876,8 +877,8 @@ func TestLiveStoreQueryMethodsAfterStoppingWithFailOnHighLag(t *testing.T) {

liveStore.cfg.FailOnHighLag = true

// Error expected from Kafka reader shutdown; we only care about query behavior after stopping begins.
_ = liveStore.stopping(nil)
// Stop the service so queries are rejected.
_ = services.StopAndAwaitTerminated(context.Background(), liveStore)

Comment thread
zhxiaogg marked this conversation as resolved.
ctx := user.InjectOrgID(context.Background(), testTenantID)

Expand Down
Loading