Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* [BUGFIX] live-store: fixed unsuccessful deregistering from membership/partition rings during shutdown [#6848](https://github.com/grafana/tempo/pull/6848) (@zhxiaogg)
* [BUGFIX] fix: respect context cancellation when reading WAL block iterator [#6928](https://github.com/grafana/tempo/pull/6928) (@zhxiaogg)
* [BUGFIX] Complete lifecycler shutdown on errors [#6906](https://github.com/grafana/tempo/pull/6906) (@javiermolinar)
* [BUGFIX] livestore: fix concurrent WAL writes from periodic and shutdown flushes [#6972](https://github.com/grafana/tempo/pull/6972) (@zhxiaogg)

### 3.0 Cleanup

Expand Down
12 changes: 9 additions & 3 deletions modules/livestore/live_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type LiveStore struct {
lagCancel context.CancelFunc
readyErr atomic.Pointer[error] // nil when ready to serve queries
lastRecordTimeNanos atomic.Int64 // stores timestamp of last consumed record as UnixNano, -1 means not set

cutToWalStop chan struct{} // closed to stop perTenantCutToWalLoop goroutines before shutdown flush
cutToWalWg sync.WaitGroup // tracks active perTenantCutToWalLoop goroutines
}

func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher completeBlockFlusher, logger log.Logger, reg prometheus.Registerer) (*LiveStore, error) {
Expand Down Expand Up @@ -180,6 +183,7 @@ func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher
completeBlockLifecycle: completeBlockLifecycle,
completeQueues: flushqueues.New[*completeOp](metricCompleteQueueLength),
startupComplete: make(chan struct{}),
cutToWalStop: make(chan struct{}),
}

// Initialize ready state to starting
Expand Down Expand Up @@ -451,6 +455,10 @@ func (s *LiveStore) stopping(error) error {
stopErr = errors.Join(stopErr, err)
}

level.Info(s.logger).Log("msg", "stopping periodic WAL flush goroutines")
s.stopAllCutToWalLoops()
level.Info(s.logger).Log("msg", "periodic WAL flush goroutines stopped")
Comment thread
zhxiaogg marked this conversation as resolved.

// Flush all data to disk.
level.Info(s.logger).Log("msg", "cutting all instances to WAL")
s.cutAllInstancesToWal()
Expand Down Expand Up @@ -690,9 +698,7 @@ func (s *LiveStore) getOrCreateInstance(tenantID string) (*instance, error) {

s.instances[tenantID] = inst

s.runInBackground(func() {
s.perTenantCutToWalLoop(inst)
})
s.startPerTenantCutToWalLoop(inst)
s.runInBackground(func() {
s.perTenantCleanupLoop(inst)
})
Expand Down
37 changes: 28 additions & 9 deletions modules/livestore/live_store_background.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,38 @@ func (s *LiveStore) retryCompleteOp(op *completeOp, span oteltrace.Span, msg str
}()
}

func (s *LiveStore) perTenantCutToWalLoop(instance *instance) {
// ticker
ticker := time.NewTicker(s.cfg.InstanceFlushPeriod)
defer ticker.Stop()
func (s *LiveStore) startPerTenantCutToWalLoop(inst *instance) {
s.cutToWalWg.Add(1)
Comment thread
zhxiaogg marked this conversation as resolved.
go func() {
defer s.cutToWalWg.Done()

for {
// Wait for startup to finish; also listen on cutToWalStop so we can
// exit if shutdown happens before startup completes.
select {
case <-ticker.C:
s.cutOneInstanceToWal(s.ctx, instance, false)
case <-s.ctx.Done():
case <-s.startupComplete:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that we will continue if startupComplete is met first we will continue. Copilot also mentions there is not a context check here, thats probably implicit in the other two but if we an either A pass it in, or document the behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment and PR description for the livestore startup/shutdown process around WAL completion.

case <-s.cutToWalStop:
return
Comment thread
zhxiaogg marked this conversation as resolved.
}
}

ticker := time.NewTicker(s.cfg.InstanceFlushPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.cutOneInstanceToWal(s.ctx, inst, false)
case <-s.cutToWalStop:
return
case <-s.ctx.Done():
return
}
}
}()
}

func (s *LiveStore) stopAllCutToWalLoops() {
close(s.cutToWalStop)
s.cutToWalWg.Wait()
}
Comment thread
zhxiaogg marked this conversation as resolved.

func (s *LiveStore) perTenantCleanupLoop(inst *instance) {
Expand Down
15 changes: 8 additions & 7 deletions modules/livestore/live_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func TestLiveStorePushBytesRejectsWhenStopping(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, liveStore)

// Transition to stopping state, then verify writes are rejected.
_ = liveStore.stopping(nil)
// Stop the service so writes are rejected.
err = services.StopAndAwaitTerminated(context.Background(), liveStore)
Comment thread
zhxiaogg marked this conversation as resolved.
require.NoError(t, err)
Comment thread
zhxiaogg marked this conversation as resolved.

id := test.ValidTraceID(nil)
expectedTrace := test.MakeTrace(1, id)
Expand Down Expand Up @@ -700,7 +701,7 @@ func TestLiveStoreShutdownWithPendingCompletions(t *testing.T) {
requireTraceInLiveStore(t, liveStore, expectedID, expectedTrace)
requireInstanceState(t, inst, instanceState{liveTraces: 1, walBlocks: 0, completeBlocks: 0})

require.NoError(t, liveStore.stopping(nil))
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), liveStore))
Comment thread
zhxiaogg marked this conversation as resolved.
}

func TestLiveStoreQueryMethodsBeforeStarted(t *testing.T) {
Expand Down Expand Up @@ -843,8 +844,8 @@ func TestLiveStoreQueryMethodsAfterStopping(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, liveStore)

// Error expected from Kafka reader shutdown; we only care about query behavior after stopping begins.
_ = liveStore.stopping(nil)
// Stop the service so queries are rejected.
_ = services.StopAndAwaitTerminated(context.Background(), liveStore)
Comment thread
zhxiaogg marked this conversation as resolved.

ctx := user.InjectOrgID(context.Background(), testTenantID)

Expand Down Expand Up @@ -876,8 +877,8 @@ func TestLiveStoreQueryMethodsAfterStoppingWithFailOnHighLag(t *testing.T) {

liveStore.cfg.FailOnHighLag = true

// Error expected from Kafka reader shutdown; we only care about query behavior after stopping begins.
_ = liveStore.stopping(nil)
// Stop the service so queries are rejected.
_ = services.StopAndAwaitTerminated(context.Background(), liveStore)

Comment thread
zhxiaogg marked this conversation as resolved.
ctx := user.InjectOrgID(context.Background(), testTenantID)

Expand Down
Loading