Conversation
Signed-off-by: Pavol Loffay <p.loffay@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR reintroduces an ingester-based write path to support “kafkaless” Tempo deployments (Distributor → Ingester/WAL → object storage) while keeping the existing Kafka-based architecture for distributed setups.
Changes:
- Add an ingester write path in the distributor (ring-based fanout using
PushBytesV2) and supporting discard/error accounting. - Allow the querier to query a non-partition ring directly when the partition ring is not configured (kafkaless mode).
- Update app wiring/config to introduce ingester module/ring usage, tweak single-binary defaults, and add runtime override validation + tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/querier/querier.go | Adds a kafkaless fallback path that queries a plain ring when partition ring is absent. |
| modules/distributor/receiver/shim.go | Updates help text for distributor_push_duration_seconds. |
| modules/distributor/distributor.go | Implements ingester write path via ring + pool, adds new metrics, discard accounting, and adjusts Kafka timeout usage. |
| modules/distributor/distributor_test.go | Adds tests and test scaffolding for ingester write path/discard logging; updates helpers to return ingester mocks. |
| modules/distributor/config.go | Adds new config toggles for write path and write-extension behavior. |
| cmd/tempo/main.go | Forces ingester ring defaults in single-binary mode. |
| cmd/tempo/app/overrides_validation.go | Adds validation tying tenant shard size to ingester replication factor. |
| cmd/tempo/app/overrides_validation_test.go | Adds unit tests for the new runtime override validation. |
| cmd/tempo/app/modules.go | Wires in ingester module + ingester ring; changes distributor/querier init to defer until rings are available; adjusts single-binary composition. |
| cmd/tempo/app/config.go | Adds ingester + ingester client config to top-level config; updates warning text for complete block timeout. |
| cmd/tempo/app/config_test.go | Updates config warning test data after warning source moved from live-store to ingester. |
| cmd/tempo/app/app.go | Adds ingester readiness check to /ready. |
| // forReplicationSet runs f for all instances in a single replication set (kafkaless mode). | ||
| func forReplicationSet[R any, TClient any](ctx context.Context, q *Querier, rs ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) { | ||
| var results []R | ||
| var mu sync.Mutex | ||
| var wg sync.WaitGroup | ||
| var firstErr error | ||
|
|
||
| for _, instance := range rs.Instances { | ||
| instance := instance | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| client, err := q.liveStorePool.GetClientForInstance(instance) | ||
| if err != nil { | ||
| mu.Lock() | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| mu.Unlock() | ||
| return | ||
| } | ||
| result, err := f(ctx, client.(TClient)) | ||
| if err != nil { | ||
| mu.Lock() | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| mu.Unlock() | ||
| return | ||
| } | ||
| mu.Lock() | ||
| results = append(results, result) | ||
| mu.Unlock() | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| if firstErr != nil && len(results) == 0 { | ||
| return nil, firstErr | ||
| } |
There was a problem hiding this comment.
In kafkaless mode this queries all instances in the replication set concurrently and then drops errors as long as at least one result succeeded. That’s a different correctness/perf model than the partition-ring path (which uses ring.DoUntilQuorum + minimize/hedging). Could we reuse ring.DoUntilQuorum (or concurrency.ForEachJobMergeResults over a single set) here so we stop at quorum and return errors consistently?
| // forReplicationSet runs f for all instances in a single replication set (kafkaless mode). | |
| func forReplicationSet[R any, TClient any](ctx context.Context, q *Querier, rs ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) { | |
| var results []R | |
| var mu sync.Mutex | |
| var wg sync.WaitGroup | |
| var firstErr error | |
| for _, instance := range rs.Instances { | |
| instance := instance | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| client, err := q.liveStorePool.GetClientForInstance(instance) | |
| if err != nil { | |
| mu.Lock() | |
| if firstErr == nil { | |
| firstErr = err | |
| } | |
| mu.Unlock() | |
| return | |
| } | |
| result, err := f(ctx, client.(TClient)) | |
| if err != nil { | |
| mu.Lock() | |
| if firstErr == nil { | |
| firstErr = err | |
| } | |
| mu.Unlock() | |
| return | |
| } | |
| mu.Lock() | |
| results = append(results, result) | |
| mu.Unlock() | |
| }() | |
| } | |
| wg.Wait() | |
| if firstErr != nil && len(results) == 0 { | |
| return nil, firstErr | |
| } | |
| // forReplicationSet runs f for a single replication set (kafkaless mode) and returns | |
| // once the replication-set quorum has been satisfied. If quorum cannot be reached, | |
| // it returns the aggregated errors from the failed replicas. | |
| func forReplicationSet[R any, TClient any](ctx context.Context, q *Querier, rs ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) { | |
| if len(rs.Instances) == 0 { | |
| return nil, nil | |
| } | |
| requiredResults := len(rs.Instances) - rs.MaxErrors | |
| if requiredResults < 1 { | |
| requiredResults = 1 | |
| } | |
| ctx, cancel := context.WithCancel(ctx) | |
| defer cancel() | |
| results := make([]R, 0, requiredResults) | |
| var mu sync.Mutex | |
| var wg sync.WaitGroup | |
| var errs error | |
| for _, instance := range rs.Instances { | |
| instance := instance | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| client, err := q.liveStorePool.GetClientForInstance(instance) | |
| if err != nil { | |
| mu.Lock() | |
| errs = multierr.Append(errs, err) | |
| mu.Unlock() | |
| return | |
| } | |
| result, err := f(ctx, client.(TClient)) | |
| if err != nil { | |
| mu.Lock() | |
| quorumReached := len(results) >= requiredResults | |
| if !(quorumReached && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded))) { | |
| errs = multierr.Append(errs, err) | |
| } | |
| mu.Unlock() | |
| return | |
| } | |
| mu.Lock() | |
| results = append(results, result) | |
| if len(results) >= requiredResults { | |
| cancel() | |
| } | |
| mu.Unlock() | |
| }() | |
| } | |
| wg.Wait() | |
| if len(results) < requiredResults { | |
| if errs != nil { | |
| return nil, errs | |
| } | |
| return nil, fmt.Errorf("failed to reach replication-set quorum: got %d/%d successful results", len(results), requiredResults) | |
| } |
| Namespace: "tempo", | ||
| Name: "distributor_push_duration_seconds", | ||
| Help: "Records the amount of time to process and route a batch through the distributor.", | ||
| Help: "Records the amount of time to push a batch to the ingester.", |
There was a problem hiding this comment.
This metric is emitted for all distributor pushes (Kafka, local live-store, ingester write path). Updating the help text to "push a batch to the ingester" seems inaccurate/misleading in Kafka mode; consider making it path-agnostic again (e.g., "process and route"), or introducing a separate metric for ingester pushes.
| Help: "Records the amount of time to push a batch to the ingester.", | |
| Help: "Records the amount of time to process and route a pushed batch.", |
| // KafkaWritePathEnabled enables the Kafka write path for Kafka-based deployments. | ||
| KafkaWritePathEnabled bool `yaml:"kafka_write_path_enabled"` |
There was a problem hiding this comment.
KafkaWritePathEnabled is a new YAML-exposed config field but it isn't referenced anywhere in the codebase (and no flag is registered). Shipping an unused user-facing knob is likely to confuse operators; could we either wire it into the write-path selection logic or remove it before release?
| // KafkaWritePathEnabled enables the Kafka write path for Kafka-based deployments. | |
| KafkaWritePathEnabled bool `yaml:"kafka_write_path_enabled"` | |
| // KafkaWritePathEnabled is reserved for internal use until it is wired into write-path selection. | |
| // Keep this out of YAML so we don't ship an operator-facing knob that has no effect. | |
| KafkaWritePathEnabled bool `yaml:"-"` |
| } | ||
| return ring.DoBatchWithOptions(ctx, ring.Write, ring.NewActivePartitionBatchRing(partitionRing), keys, func(partition ring.InstanceDesc, indexes []int) error { | ||
| localCtx, cancel := context.WithTimeout(ctx, d.cfg.KafkaConfig.WriteTimeout) | ||
| localCtx, cancel := context.WithTimeout(ctx, d.clientCfg.RemoteTimeout) |
There was a problem hiding this comment.
This changes the Kafka produce timeout from KafkaConfig.WriteTimeout to IngesterClient.RemoteTimeout. Those are unrelated knobs (gRPC client timeout vs Kafka write timeout), and it likely makes Kafka latency tuning confusing. Should this stay on the Kafka config timeout instead?
| localCtx, cancel := context.WithTimeout(ctx, d.clientCfg.RemoteTimeout) | |
| localCtx, cancel := context.WithTimeout(ctx, d.cfg.KafkaConfig.WriteTimeout) |
| if len(pushResponse.ErrorsByTrace) == 0 { | ||
| for _, reqBatchIndex := range indexes { | ||
| if reqBatchIndex > numOfTraces { | ||
| level.Warn(d.logger).Log("msg", fmt.Sprintf("batch index %d out of bound for length %d", reqBatchIndex, numOfTraces)) | ||
| continue | ||
| } | ||
| numSuccessByTraceIndex[reqBatchIndex]++ |
There was a problem hiding this comment.
In the no-error response path, this bounds check should be >= numOfTraces (not >) to avoid a panic when reqBatchIndex == numOfTraces (slice is 0..numOfTraces-1).
| if config.Ingestion.TenantShardSize != 0 { | ||
| ingesterReplicationFactor := r.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor | ||
| if config.Ingestion.TenantShardSize < ingesterReplicationFactor { | ||
| return warnings, fmt.Errorf("ingester.tenant.shard_size is lower than replication factor (%d < %d)", config.Ingestion.TenantShardSize, ingesterReplicationFactor) |
There was a problem hiding this comment.
This introduces a hard validation error for per-tenant runtime overrides. Per Tempo’s fail-open approach for multi-tenant overrides, a bad tenant override shouldn’t prevent Tempo from starting (or loading other tenants’ overrides). Could this be downgraded to a warning (or clamp/ignore the value) instead of returning an error?
| return warnings, fmt.Errorf("ingester.tenant.shard_size is lower than replication factor (%d < %d)", config.Ingestion.TenantShardSize, ingesterReplicationFactor) | |
| warnings = append(warnings, fmt.Errorf("ingester.tenant.shard_size is lower than replication factor (%d < %d)", config.Ingestion.TenantShardSize, ingesterReplicationFactor)) |
| if err := d.StartAsync(ctx); err != nil { | ||
| return err | ||
| } | ||
| return d.AwaitRunning(ctx) | ||
| }, | ||
| // running | ||
| func(ctx context.Context) error { | ||
| <-ctx.Done() | ||
| return nil |
There was a problem hiding this comment.
These wrappers start a child service (ingesterRing/distributor/querier) but the running function only waits on ctx.Done(), so if the child service fails/stops unexpectedly the module manager won’t observe it. Could we also select on the child service’s failure channel (or otherwise propagate failures) so the process can fail fast and readiness reflects reality? (Same pattern appears in initIngesterRing and initQuerier.)
| // Check if Kafka is configured to determine which ring to use | ||
| var liveStoreRing ring.ReadRing | ||
| kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" | ||
| if kafkaConfigured && t.readRings[ringLiveStore] != nil { | ||
| liveStoreRing = t.readRings[ringLiveStore] | ||
| } else { | ||
| liveStoreRing = t.readRings[ringIngester] | ||
| } | ||
| if liveStoreRing == nil { | ||
| return fmt.Errorf("live-store or ingester ring not initialized") | ||
| } |
There was a problem hiding this comment.
In Kafka mode, falling back to the ingester ring when the live-store ring isn’t initialized will make “recent” queries silently look at the wrong component and return incomplete results. If Kafka is configured, should this return an error unless the live-store ring is available (rather than falling back)?
| // Check if Kafka is configured to determine which ring to use | |
| var liveStoreRing ring.ReadRing | |
| kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" | |
| if kafkaConfigured && t.readRings[ringLiveStore] != nil { | |
| liveStoreRing = t.readRings[ringLiveStore] | |
| } else { | |
| liveStoreRing = t.readRings[ringIngester] | |
| } | |
| if liveStoreRing == nil { | |
| return fmt.Errorf("live-store or ingester ring not initialized") | |
| } | |
| var liveStoreRing ring.ReadRing | |
| kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" | |
| if kafkaConfigured { | |
| liveStoreRing = t.readRings[ringLiveStore] | |
| if liveStoreRing == nil { | |
| return fmt.Errorf("live-store ring not initialized for Kafka mode") | |
| } | |
| } else { | |
| liveStoreRing = t.readRings[ringIngester] | |
| if liveStoreRing == nil { | |
| return fmt.Errorf("ingester ring not initialized") | |
| } | |
| } |
|
|
||
| // composite targets | ||
| SingleBinary: {BackendScheduler, BackendWorker, QueryFrontend, Querier, Distributor, MetricsGenerator, BlockBuilder, LiveStore}, | ||
| SingleBinary: {QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator}, |
There was a problem hiding this comment.
-target=all no longer includes backend-scheduler/backend-worker/block-builder/live-store, but other code paths (and comments above) assume backend-worker will handle store polling in single-binary mode. As-is, single-binary may stop polling blocklists entirely, breaking search/sharding behavior. Should single-binary keep backend-worker (and likely scheduler) or explicitly enable polling elsewhere when running as all?
| SingleBinary: {QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator}, | |
| // Keep backend scheduler/worker in single-binary mode so `-target=all` continues to | |
| // run the backend polling path used for blocklist updates/search sharding. | |
| SingleBinary: {QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator, BackendScheduler, BackendWorker}, |
| t.cfg.Distributor.KafkaConfig = t.cfg.Ingest.Kafka | ||
| t.cfg.Distributor.PushSpansToKafka = true | ||
| // Enable Kafka write path when Kafka is configured (topic is set) and we're not in single binary mode | ||
| kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" | ||
| if kafkaConfigured && !singleBinary { | ||
| t.cfg.Distributor.PushSpansToKafka = true | ||
| t.cfg.Distributor.IngesterWritePathEnabled = false | ||
| } else { | ||
| t.cfg.Distributor.PushSpansToKafka = false | ||
| t.cfg.Distributor.IngesterWritePathEnabled = true | ||
| } |
There was a problem hiding this comment.
This PR reintroduces a user-facing deployment mode/config surface (kafkaless ingester write path), but there’s no CHANGELOG.md entry in the diff. Could we add a changelog entry so operators discover the new mode and any config changes?
|
Thank you for the PR @pavolloffay but we are commited to delete all the ingesters code. Said so, we agree that Kafka as a requirement for the singlebinary mode is not sustainable. Because of that we have been working towards this kafkaless mode. See #6729 and #6618 The only missing part is flushing to the backend storage that will come next. We want to complete all that work before the official release of 3.0. |
What this PR does:
This PR brings back the ingester-based write path, allowing Tempo 3.0 to be deployed without Kafka. This enables two deployment models:
Motivation
Alternatives
An alternative could be to make the Tempo architecture pluggable to allow maintaining the ingester separately (e.g. in some Grafana contrib repository?).
Which issue(s) this PR fixes:
Related to #6073
Related to #4077
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]