feat: single-binary local distributor to live-store ingest path#6729
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a single-binary “local fanout” write path so the distributor can push traces to in-process consumers (live-store and metrics-generator) while allowing live-store to disable Kafka consumption and enforce lifecycle-based write rejection.
Changes:
- Introduces
LocalPushTargetsand a unifiedpushLocalpath in the distributor (including local live-storePushBytes). - Adds
ConsumeFromKafkatoggle to live-store, adjusts startup/shutdown/lag logic accordingly, and wires it off in single-binary mode. - Adds tests for local live-store ingest, lifecycle rejection, and Kafka-success/local-failure semantics.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/livestore/live_store.go | Adds PushBytes, gates Kafka reader/lag logic behind ConsumeFromKafka, and updates lag behavior. |
| modules/livestore/config.go | Adds ConsumeFromKafka config flag with default enabled. |
| cmd/tempo/app/modules.go | Wires single-binary local push targets and disables live-store Kafka consumption in single-binary mode. |
| modules/distributor/distributor.go | Introduces LocalPushTargets and implements local push to generator and live-store. |
| modules/livestore/live_store_test.go | Adds tests for local ingest and lifecycle write rejection when starting/stopping. |
| modules/distributor/distributor_test.go | Adds tests for local live-store push and Kafka-success/local-failure behavior. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Adds a single-binary “local fanout” ingest path so the distributor can push traces directly to in-process consumers (live-store and metrics-generator), and introduces a live-store mode that disables Kafka consumption in single-binary wiring.
Changes:
- Introduces
LocalPushTargetsin the distributor and a unifiedpushLocalpath to fan out to local live-store and/or metrics-generator. - Adds
ConsumeFromKafkato live-store config and refactors live-store startup/shutdown to support a no-Kafka-consumption mode with localPushBytesingest. - Adds/updates tests for local live-store ingest behavior and single-binary wiring expectations.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
modules/livestore/live_store.go |
Refactors ingest startup, adds PushBytes, and gates readiness/lag logic on Kafka consumption. |
modules/livestore/config.go |
Adds internal ConsumeFromKafka flag defaulting to true. |
cmd/tempo/app/modules.go |
Wires distributor local push targets (generator + live-store) and disables live-store Kafka consumption in single-binary. |
modules/distributor/distributor.go |
Adds LocalPushTargets + local push logic for generator/live-store. |
modules/livestore/live_store_test.go |
Adds tests covering local ingest and non-Kafka start/stop behaviors. |
modules/livestore/instance_search_test.go |
Updates live-store construction for new New signature. |
modules/distributor/distributor_test.go |
Adds tests for local live-store fanout and error behavior; updates constructor calls. |
cmd/tempo/app/modules_test.go |
Adds a wiring test ensuring single-binary live-store uses local ingest mode. |
Comments suppressed due to low confidence (1)
modules/livestore/live_store.go:194
- When
ConsumeFromKafkais false,Newstill computesIngestConfig.Kafka.ConsumerGroupviaingest.LiveStoreConsumerGroupID(cfg.Ring.InstanceID), which requires an instance ID ending in-<n>and will error for single-binary IDs likesingle-binary. Gate the consumer-group/partition-ID derivation behindConsumeFromKafka(or provide a safe default) so non-Kafka mode can start with arbitrary instance IDs.
if cfg.ConsumeFromKafka {
s.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.Ring.InstanceID)
if err != nil {
return nil, fmt.Errorf("calculating livestore partition ID: %w", err)
}
// TODO: It's probably easier to just use the ID directly
// https://raintank-corp.slack.com/archives/C05CAA0ULUF/p1752847274420489
s.cfg.IngestConfig.Kafka.ConsumerGroup, err = ingest.LiveStoreConsumerGroupID(cfg.Ring.InstanceID)
if err != nil {
return nil, fmt.Errorf("calculating livestore consumer group ID: %w", err)
}
| if t.liveStore == nil { | ||
| return nil, errors.New("live-store not initialized") | ||
| } | ||
| return t.liveStore.PushBytes(ctx, req) |
There was a problem hiding this comment.
initDistributor() wires a local live-store callback that fails if t.liveStore is nil, but the module dependency graph doesn't make Distributor depend on LiveStore in single-binary mode. Because module init/start order isn't guaranteed between sibling deps, the distributor can start accepting pushes before the live-store is initialized, causing intermittent "live-store not initialized" ingest failures. Consider adding an explicit dependency (in single-binary) so LiveStore starts before Distributor, or wire the callback only after live-store init.
| if t.liveStore == nil { | |
| return nil, errors.New("live-store not initialized") | |
| } | |
| return t.liveStore.PushBytes(ctx, req) | |
| for { | |
| if t.liveStore != nil { | |
| return t.liveStore.PushBytes(ctx, req) | |
| } | |
| select { | |
| case <-ctx.Done(): | |
| return nil, ctx.Err() | |
| case <-time.After(10 * time.Millisecond): | |
| } | |
| } |
There was a problem hiding this comment.
- initDistributor() stores a callback that references t.liveStore
- but t.liveStore is assigned during initLiveStore()
- and all module init happens before any services are started
So this is a noop
| req := &tempopb.PushBytesRequest{ | ||
| Traces: make([]tempopb.PreallocBytes, len(traces)), | ||
| Ids: make([][]byte, len(traces)), | ||
| } | ||
| for i, tr := range traces { | ||
| b, err := proto.Marshal(tr.trace) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal trace for local live-store push: %w", err) | ||
| } | ||
| req.Traces[i].Slice = b | ||
| req.Ids[i] = tr.id | ||
| } |
There was a problem hiding this comment.
pushTracesToLiveStore() re-marshals every *tempopb.Trace even though sendToKafka() marshals the same traces again for Kafka writes. In single-binary mode this doubles protobuf marshal work per request (and allocations), which can be significant at high ingest rates. Consider restructuring so the marshaled bytes are produced once and reused for both Kafka and the local live-store push (for example by passing the already-marshaled slices into pushTracesToLiveStore or caching them on rebatchedTrace).
There was a problem hiding this comment.
This is just temporary
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e34d38792e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // PushSpansFunc is a callback used to push spans to a local in-process consumer | ||
| // without gRPC/ring indirection (single-binary mode). | ||
| type PushSpansFunc func(ctx context.Context, req *tempopb.PushSpansRequest) (*tempopb.PushResponse, error) | ||
|
|
||
| // PushBytesFunc is a callback used to push pre-marshaled traces to a local | ||
| // in-process consumer without Kafka indirection (single-binary mode). | ||
| type PushBytesFunc func(ctx context.Context, req *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) | ||
|
|
||
| // LocalPushTargets contains optional local in-process push callbacks. | ||
| type LocalPushTargets struct { | ||
| Generator PushSpansFunc | ||
| LiveStore PushBytesFunc | ||
| } |
There was a problem hiding this comment.
Nit: this can be made less ad-hoc with generics.
| d.pushTracesToGenerator(ctx, userID, ringTokens, rebatchedTraces) | ||
| if err := d.pushLocal(ctx, userID, ringTokens, rebatchedTraces); err != nil { | ||
| level.Error(d.logger).Log("msg", "failed to push to local consumers", "err", err, "tenant", userID) | ||
| return nil, err |
There was a problem hiding this comment.
I think this code is becoming too convoluted. pushTracesKafka is always called because pushSpansToKafka is always set to true. Then we have pushLocal which is coded as a complementary write path, but it should be an alternative to pushing to Kafka.
If the in-process write to the live-stores fails because the it isn't ready or it was waiting on backpressure, the entire write fails and we end up with duplicated data.
I believe these changes are in the right direction, but it's difficult to follow where this is going right now.
| return t.generator.PushSpans(ctx, req) | ||
| } | ||
|
|
||
| localPushTargets.LiveStore = func(ctx context.Context, req *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { |
There was a problem hiding this comment.
Now that the distributor depends on the live-store when monolithic, it should be handled in the module dependency tree
There was a problem hiding this comment.
Good catch I have added the livestore and the generator as depencencies of the distributor for the single binary mode
| if err := d.pushTracesKafka(ctx, userID, ringTokens, rebatchedTraces); err != nil { | ||
| level.Error(d.logger).Log("msg", "failed to write to kafka", "err", err, "tenant", userID) | ||
| return nil, err | ||
| } | ||
|
|
||
| // In single-binary mode we also push directly to metrics-generator in-process, | ||
| // while still writing to Kafka for live-store and block-builder. | ||
| if d.pushSpansToLocalFunc != nil { | ||
| d.pushTracesToGenerator(ctx, userID, ringTokens, rebatchedTraces) | ||
| if err := d.pushLocal(ctx, userID, ringTokens, rebatchedTraces); err != nil { | ||
| level.Error(d.logger).Log("msg", "failed to push to local consumers", "err", err, "tenant", userID) | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
PushTraces() writes to Kafka first and then calls pushLocal(). If the local live-store push fails after Kafka has already accepted the records, PushTraces() returns an error and the client may retry, producing duplicate Kafka writes (and still a partial success where downstream Kafka consumers see the data but the single-binary live-store may not). Could we either (a) push to local live-store before Kafka, or (b) treat local live-store failures as non-fatal once the Kafka write succeeded (or add compensation), to avoid retry-induced duplication/partial success?
There was a problem hiding this comment.
This is just an intermediate state. In the final one it will write either to kafka or via callback
| // Stop consuming | ||
| err := services.StopAndAwaitTerminated(context.Background(), s.reader) | ||
| if err != nil { | ||
| level.Warn(s.logger).Log("msg", "failed to stop reader", "err", err) | ||
| return err | ||
| } | ||
|
|
||
| // Reset lag metrics for our partition when stopping | ||
| ingest.ResetLagMetricsForRevokedPartitions(s.cfg.IngestConfig.Kafka.ConsumerGroup, []int32{s.ingestPartitionID}) |
There was a problem hiding this comment.
In stopping(), services.StopAndAwaitTerminated(..., s.reader) is called whenever ConsumeFromKafka is true, but s.reader can still be nil if startup failed before the partition reader was created/started (or if the service was never fully started). Passing a typed-nil service can panic when the stop path calls methods on it. Can we guard this with if s.reader != nil (and similarly only reset lag metrics / stop lag worker when those were actually started)?
| // Stop consuming | |
| err := services.StopAndAwaitTerminated(context.Background(), s.reader) | |
| if err != nil { | |
| level.Warn(s.logger).Log("msg", "failed to stop reader", "err", err) | |
| return err | |
| } | |
| // Reset lag metrics for our partition when stopping | |
| ingest.ResetLagMetricsForRevokedPartitions(s.cfg.IngestConfig.Kafka.ConsumerGroup, []int32{s.ingestPartitionID}) | |
| if s.reader != nil { | |
| // Stop consuming. | |
| err := services.StopAndAwaitTerminated(context.Background(), s.reader) | |
| if err != nil { | |
| level.Warn(s.logger).Log("msg", "failed to stop reader", "err", err) | |
| return err | |
| } | |
| // Reset lag metrics for our partition when stopping. | |
| ingest.ResetLagMetricsForRevokedPartitions(s.cfg.IngestConfig.Kafka.ConsumerGroup, []int32{s.ingestPartitionID}) | |
| } |
| @@ -699,7 +706,7 @@ func (t *App) initLiveStore() (services.Service, error) { | |||
| t.cfg.LiveStore.WAL.Version = t.cfg.StorageConfig.Trace.Block.Version | |||
|
|
|||
| var err error | |||
| t.liveStore, err = livestore.New(t.cfg.LiveStore, t.Overrides, log.Logger, prometheus.DefaultRegisterer, singlePartition) | |||
| t.liveStore, err = livestore.New(t.cfg.LiveStore, t.Overrides, log.Logger, prometheus.DefaultRegisterer) | |||
There was a problem hiding this comment.
PR introduces a user-facing ingest-path change for single-binary (local distributor � live-store ingest) but the PR checklist indicates CHANGELOG.md hasn�t been updated. Could you add a changelog entry in the required format so the feature is captured in release notes?
What this PR does:
Adds distributor local fanout via
LocalPushTargetswith a unifiedpushLocalpath.Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]