diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 99601ab9da7..b083c8b956c 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/tempo/modules/distributor/receiver" frontend_v1 "github.com/grafana/tempo/modules/frontend/v1" "github.com/grafana/tempo/modules/generator" + "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" "github.com/grafana/tempo/modules/storage" @@ -74,6 +75,7 @@ type App struct { distributor *distributor.Distributor querier *querier.Querier frontend *frontend_v1.Frontend + ingester *ingester.Ingester generator *generator.Generator blockBuilder *blockbuilder.BlockBuilder store storage.Store @@ -345,6 +347,16 @@ func (t *App) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) return } + // Ingester has a special check that makes sure that it was able to register into the ring, + // and that all other ring entries are OK too. + if t.ingester != nil { + if err := t.ingester.CheckReady(r.Context()); err != nil { + http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable) + return + } + } + + // Generator has a dedicated readiness check for generator-specific dependencies. if t.generator != nil { if err := t.generator.CheckReady(r.Context()); err != nil { http.Error(w, "Generator not ready: "+err.Error(), http.StatusServiceUnavailable) diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index 85baa2f65e5..a29804c4d15 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -16,6 +16,8 @@ import ( "github.com/grafana/tempo/modules/distributor" "github.com/grafana/tempo/modules/frontend" "github.com/grafana/tempo/modules/generator" + "github.com/grafana/tempo/modules/ingester" + ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/livestore" livestore_client "github.com/grafana/tempo/modules/livestore/client" "github.com/grafana/tempo/modules/overrides" @@ -52,10 +54,12 @@ type Config struct { Server server.Config `yaml:"server,omitempty"` InternalServer internalserver.Config `yaml:"internal_server,omitempty"` Distributor distributor.Config `yaml:"distributor,omitempty"` + IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` MetricsGeneratorClient map[string]any `yaml:"metrics_generator_client,omitempty"` // Deprecated: kept for one-release config compatibility. LiveStoreClient livestore_client.Config `yaml:"live_store_client,omitempty"` Querier querier.Config `yaml:"querier,omitempty"` Frontend frontend.Config `yaml:"query_frontend,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` Generator generator.Config `yaml:"metrics_generator,omitempty"` Ingest ingest.Config `yaml:"ingest,omitempty"` BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"` @@ -139,11 +143,14 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { // Everything else flagext.DefaultValues(&c.LiveStoreClient) c.LiveStoreClient.GRPCClientConfig.GRPCCompression = defaultGRPCCompression + flagext.DefaultValues(&c.IngesterClient) + c.IngesterClient.GRPCClientConfig.GRPCCompression = defaultGRPCCompression flagext.DefaultValues(&c.BackenSchedulerClient) c.BackenSchedulerClient.GRPCClientConfig.GRPCCompression = defaultGRPCCompression c.Overrides.RegisterFlagsAndApplyDefaults(f) c.Distributor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "distributor"), f) + c.Ingester.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "ingester"), f) c.Generator.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "generator"), f) c.Ingest.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "ingest"), f) c.BlockBuilder.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "block-builder"), f) @@ -165,7 +172,7 @@ func (c *Config) MultitenancyIsEnabled() bool { // CheckConfig checks if config values are suspect and returns a bundled list of warnings and explanation. func (c *Config) CheckConfig() []ConfigWarning { var warnings []ConfigWarning - if c.LiveStore.CompleteBlockTimeout < c.StorageConfig.Trace.BlocklistPoll { + if c.Ingester.CompleteBlockTimeout < c.StorageConfig.Trace.BlocklistPoll { warnings = append(warnings, warnCompleteBlockTimeout) } @@ -250,8 +257,8 @@ type ConfigWarning struct { var ( warnCompleteBlockTimeout = ConfigWarning{ - Message: "live_store.complete_block_timeout < storage.trace.blocklist_poll", - Explain: "You may receive 404s between the time the live-store has flushed a trace and the querier is aware of the new block", + Message: "ingester.complete_block_timeout < storage.trace.blocklist_poll", + Explain: "You may receive 404s between the time the ingesters have flushed a trace and the querier is aware of the new block", } warnBlockRetention = ConfigWarning{ Message: "backend_worker.compaction.compacted_block_timeout < storage.trace.blocklist_poll", diff --git a/cmd/tempo/app/config_test.go b/cmd/tempo/app/config_test.go index 6aea2446b2d..713c941ff44 100644 --- a/cmd/tempo/app/config_test.go +++ b/cmd/tempo/app/config_test.go @@ -6,7 +6,6 @@ import ( "github.com/grafana/tempo/modules/blockbuilder" "github.com/grafana/tempo/modules/frontend" - "github.com/grafana/tempo/modules/livestore" "github.com/grafana/tempo/tempodb/backend/s3" "github.com/stretchr/testify/assert" @@ -68,9 +67,6 @@ func TestConfig_CheckConfig(t *testing.T) { "foo-0": {0}, }, }, - LiveStore: livestore.Config{ - CompleteBlockTimeout: 30 * time.Second, - }, }, expect: []ConfigWarning{ warnCompleteBlockTimeout, diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 4ec677e6163..88dac6df15b 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/tempo/modules/frontend/interceptor" frontend_v1pb "github.com/grafana/tempo/modules/frontend/v1/frontendv1pb" "github.com/grafana/tempo/modules/generator" + "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/modules/overrides" userconfigurableoverridesapi "github.com/grafana/tempo/modules/overrides/userconfigurable/api" "github.com/grafana/tempo/modules/querier" @@ -64,11 +65,13 @@ const ( CacheProvider string = "cache-provider" // rings + IngesterRing string = "ring" LiveStoreRing string = "live-store-ring" PartitionRing string = "partition-ring" GeneratorRingWatcher string = "generator-ring-watcher" // individual targets + Ingester string = "ingester" Distributor string = "distributor" MetricsGenerator string = "metrics-generator" MetricsGeneratorNoLocalBlocks string = "metrics-generator-no-local-blocks" @@ -83,6 +86,7 @@ const ( SingleBinary string = "all" // ring names + ringIngester string = "ingester" ringLiveStore string = "live-store" ) @@ -152,6 +156,45 @@ func (t *App) initInternalServer() (services.Service, error) { return s, nil } +func (t *App) initIngesterRing() (services.Service, error) { + // We need to defer ring creation until the service starts, because the + // MemberlistKV needs to be running before we can create the ring's KV client. + var ingesterRing *ring.Ring + + return services.NewBasicService( + // starting + func(ctx context.Context) error { + r, err := tempo_ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig, ringIngester, t.cfg.Ingester.OverrideRingKey, prometheus.DefaultRegisterer) + if err != nil { + return fmt.Errorf("failed to create ring %s: %w", ringIngester, err) + } + ingesterRing = r + t.Server.HTTPRouter().Handle("/"+ringIngester+"/ring", ingesterRing) + t.readRings[ringIngester] = ingesterRing + + // Start the underlying ring service + if err := ingesterRing.StartAsync(ctx); err != nil { + return err + } + return ingesterRing.AwaitRunning(ctx) + }, + // running + func(ctx context.Context) error { + // Wait until context is done or ring fails + <-ctx.Done() + return nil + }, + // stopping + func(stopping error) error { + if ingesterRing != nil { + ingesterRing.StopAsync() + return ingesterRing.AwaitTerminated(context.Background()) + } + return nil + }, + ), nil +} + func (t *App) initLiveStoreRing() (services.Service, error) { return t.initReadRing(t.cfg.LiveStore.Ring.ToRingConfig(), ringLiveStore, ringLiveStore) } @@ -245,7 +288,15 @@ func (t *App) initDistributor() (services.Service, error) { singleBinary := IsSingleBinary(t.cfg.Target) t.cfg.Distributor.KafkaConfig = t.cfg.Ingest.Kafka - t.cfg.Distributor.PushSpansToKafka = true + // Enable Kafka write path when Kafka is configured (topic is set) and we're not in single binary mode + kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" + if kafkaConfigured && !singleBinary { + t.cfg.Distributor.PushSpansToKafka = true + t.cfg.Distributor.IngesterWritePathEnabled = false + } else { + t.cfg.Distributor.PushSpansToKafka = false + t.cfg.Distributor.IngesterWritePathEnabled = true + } localPushTargets := distributor.LocalPushTargets{} if singleBinary { @@ -264,27 +315,81 @@ func (t *App) initDistributor() (services.Service, error) { } } - // todo: make write-path client a module instead of passing the config everywhere - distributor, err := distributor.New(t.cfg.Distributor, - localPushTargets, - t.partitionRing, - t.Overrides, - t.TracesConsumerMiddleware, - log.Logger, t.cfg.Server.LogLevel, prometheus.DefaultRegisterer) - if err != nil { - return nil, fmt.Errorf("failed to create distributor: %w", err) - } - t.distributor = distributor +// Defer distributor creation until service starts, when the ingester ring is available. + return services.NewBasicService( + // starting + func(ctx context.Context) error { + // Wait for ingester ring to be available + if t.readRings[ringIngester] == nil { + return fmt.Errorf("ingester ring not initialized") + } - if distributor.DistributorRing != nil { - t.Server.HTTPRouter().Handle("/distributor/ring", distributor.DistributorRing) - } + d, err := distributor.New(t.cfg.Distributor, + t.cfg.IngesterClient, + t.readRings[ringIngester], + localPushTargets, + t.partitionRing, + t.Overrides, + t.TracesConsumerMiddleware, + log.Logger, t.cfg.Server.LogLevel, prometheus.DefaultRegisterer) + if err != nil { + return fmt.Errorf("failed to create distributor: %w", err) + } + t.distributor = d + + if d.DistributorRing != nil { + t.Server.HTTPRouter().Handle("/distributor/ring", d.DistributorRing) + } + + if usageHandler := d.UsageTrackerHandler(); usageHandler != nil { + t.Server.HTTPRouter().Handle("/usage_metrics", usageHandler) + } + + // Start the distributor service + if err := d.StartAsync(ctx); err != nil { + return err + } + return d.AwaitRunning(ctx) + }, + // running + func(ctx context.Context) error { + <-ctx.Done() + return nil + }, + // stopping + func(stopping error) error { + if t.distributor != nil { + t.distributor.StopAsync() + return t.distributor.AwaitTerminated(context.Background()) + } + return nil + }, + ), nil +} + +func (t *App) initIngester() (services.Service, error) { + t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort + t.cfg.Ingester.DedicatedColumns = t.cfg.StorageConfig.Trace.Block.DedicatedColumns + t.cfg.Ingester.IngestStorageConfig = t.cfg.Ingest + + // In SingleBinary mode don't try to discover partition from host name. Always use + // partition 0. This is for small installs or local/debugging setups. + singlePartition := IsSingleBinary(t.cfg.Target) - if usageHandler := distributor.UsageTrackerHandler(); usageHandler != nil { - t.Server.HTTPRouter().Handle("/usage_metrics", usageHandler) + ing, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer, singlePartition) + if err != nil { + return nil, fmt.Errorf("failed to create ingester: %w", err) } + t.ingester = ing - return t.distributor, nil + tempopb.RegisterPusherServer(t.Server.GRPC(), t.ingester) + tempopb.RegisterQuerierServer(t.Server.GRPC(), t.ingester) + t.Server.HTTPRouter().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) + t.Server.HTTPRouter().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) + t.Server.HTTPRouter().Methods(http.MethodGet, http.MethodPost, http.MethodDelete). + Path("/ingester/prepare-partition-downscale"). + Handler(http.HandlerFunc(t.ingester.PreparePartitionDownscaleHandler)) + return t.ingester, nil } func (t *App) initGenerator() (services.Service, error) { @@ -410,51 +515,95 @@ func (t *App) initQuerier() (services.Service, error) { t.store.EnablePolling(context.Background(), nil, false) } - liveStoreRing := t.readRings[ringLiveStore] + // Defer querier creation until service starts, when the ring is available. + return services.NewBasicService( + // starting + func(ctx context.Context) error { + // For Kafka architecture, use live-store ring. For kafkaless, use ingester ring. + // Check if Kafka is configured to determine which ring to use + var liveStoreRing ring.ReadRing + kafkaConfigured := t.cfg.Ingest.Kafka.Topic != "" + if kafkaConfigured && t.readRings[ringLiveStore] != nil { + liveStoreRing = t.readRings[ringLiveStore] + } else { + liveStoreRing = t.readRings[ringIngester] + } + if liveStoreRing == nil { + return fmt.Errorf("live-store or ingester ring not initialized") + } - querier, err := querier.New( - t.cfg.Querier, - liveStoreRing, - t.cfg.LiveStoreClient, - t.partitionRing, - t.cfg.Frontend.TraceByID.ExternalEnabled, - t.store, - t.Overrides, - ) - if err != nil { - return nil, fmt.Errorf("failed to create querier: %w", err) - } - t.querier = querier + // Only pass partitionRing when Kafka is configured + var partitionRing *ring.PartitionInstanceRing + if t.cfg.Ingest.Kafka.Topic != "" { + partitionRing = t.partitionRing + } - middleware := middleware.Merge( - t.HTTPAuthMiddleware, - ) + q, err := querier.New( + t.cfg.Querier, + liveStoreRing, + t.cfg.LiveStoreClient, + partitionRing, + t.cfg.Frontend.TraceByID.ExternalEnabled, + t.store, + t.Overrides, + ) + if err != nil { + return fmt.Errorf("failed to create querier: %w", err) + } + t.querier = q + + middleware := middleware.Merge( + t.HTTPAuthMiddleware, + ) - tracesHandler := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler) + tracesHandler := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler) - tracesHandlerV2 := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandlerV2)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTracesV2)), tracesHandlerV2) + tracesHandlerV2 := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandlerV2)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTracesV2)), tracesHandlerV2) - searchHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchHandler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler) + searchHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchHandler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler) - searchTagsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsHandler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler) + searchTagsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsHandler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler) - searchTagsV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsV2Handler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler) + searchTagsV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsV2Handler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler) - searchTagValuesHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesHandler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler) + searchTagValuesHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesHandler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler) - searchTagValuesV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesV2Handler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler) + searchTagValuesV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesV2Handler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler) - queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.QueryRangeHandler)) - t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) + queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.QueryRangeHandler)) + t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) - return t.querier, t.querier.CreateAndRegisterWorker(t.Server.HTTPHandler()) + if err := t.querier.CreateAndRegisterWorker(t.Server.HTTPHandler()); err != nil { + return fmt.Errorf("failed to create and register worker: %w", err) + } + + // Start the querier service + if err := t.querier.StartAsync(ctx); err != nil { + return err + } + return t.querier.AwaitRunning(ctx) + }, + // running + func(ctx context.Context) error { + <-ctx.Done() + return nil + }, + // stopping + func(stopping error) error { + if t.querier != nil { + t.querier.StopAsync() + return t.querier.AwaitTerminated(context.Background()) + } + return nil + }, + ), nil } func (t *App) initQueryFrontend() (services.Service, error) { @@ -565,6 +714,7 @@ func (t *App) initMemberlistKV() (services.Service, error) { dnsProvider := dns.NewProvider(log.Logger, dnsProviderReg, dns.GolangResolverType) t.MemberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV, log.Logger, dnsProvider, reg) + t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.cfg.Generator.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.cfg.BackendWorker.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV @@ -739,11 +889,13 @@ func (t *App) setupModuleManager() error { mm.RegisterModule(UsageReport, t.initUsageReport) mm.RegisterModule(CacheProvider, t.initCacheProvider, modules.UserInvisibleModule) mm.RegisterModule(GeneratorRingWatcher, t.initGeneratorRingWatcher, modules.UserInvisibleModule) + mm.RegisterModule(IngesterRing, t.initIngesterRing, modules.UserInvisibleModule) mm.RegisterModule(LiveStoreRing, t.initLiveStoreRing, modules.UserInvisibleModule) mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) mm.RegisterModule(Common, nil, modules.UserInvisibleModule) + mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Querier, t.initQuerier) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) @@ -756,7 +908,7 @@ func (t *App) setupModuleManager() error { mm.RegisterModule(SingleBinary, nil) - distributorDeps := []string{Common, LiveStoreRing, PartitionRing} + distributorDeps := []string{Common, IngesterRing, PartitionRing} if IsSingleBinary(t.cfg.Target) { // In single-binary mode the distributor calls the live-store and metrics-generator in-process. // Make those runtime dependencies explicit in the module DAG instead of relying on sibling @@ -773,25 +925,27 @@ func (t *App) setupModuleManager() error { OverridesAPI: {Server, Overrides}, MemberlistKV: {Server}, UsageReport: {MemberlistKV}, + IngesterRing: {Server, MemberlistKV}, LiveStoreRing: {Server, MemberlistKV}, - PartitionRing: {MemberlistKV, Server, LiveStoreRing}, + PartitionRing: {MemberlistKV, Server, IngesterRing, LiveStoreRing}, GeneratorRingWatcher: {MemberlistKV}, Common: {UsageReport, Server, Overrides}, // individual targets + Ingester: {Common, Store, MemberlistKV}, QueryFrontend: {Common, Store, OverridesAPI}, Distributor: distributorDeps, MetricsGenerator: {Common, MemberlistKV, PartitionRing, GeneratorRingWatcher}, MetricsGeneratorNoLocalBlocks: {Common, MemberlistKV, GeneratorRingWatcher}, - Querier: {Common, Store, LiveStoreRing, PartitionRing}, + Querier: {Common, Store, IngesterRing, LiveStoreRing, PartitionRing}, BlockBuilder: {Common, Store, MemberlistKV, PartitionRing}, BackendScheduler: {Common, Store}, BackendWorker: {Common, Store, MemberlistKV}, LiveStore: {Common, MemberlistKV, PartitionRing}, // composite targets - SingleBinary: {BackendScheduler, BackendWorker, QueryFrontend, Querier, Distributor, MetricsGenerator, BlockBuilder, LiveStore}, + SingleBinary: {QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator}, } for mod, targets := range deps { diff --git a/cmd/tempo/app/overrides_validation.go b/cmd/tempo/app/overrides_validation.go index 8cbabc120ec..589f941de0f 100644 --- a/cmd/tempo/app/overrides_validation.go +++ b/cmd/tempo/app/overrides_validation.go @@ -23,6 +23,13 @@ func newRuntimeConfigValidator(cfg *Config) overrides.Validator { } func (r *runtimeConfigValidator) Validate(config *overrides.Overrides) (warnings []error, err error) { + if config.Ingestion.TenantShardSize != 0 { + ingesterReplicationFactor := r.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor + if config.Ingestion.TenantShardSize < ingesterReplicationFactor { + return warnings, fmt.Errorf("ingester.tenant.shard_size is lower than replication factor (%d < %d)", config.Ingestion.TenantShardSize, ingesterReplicationFactor) + } + } + if config.MetricsGenerator.GenerateNativeHistograms != "" { if err := validation.ValidateHistogramMode(string(config.MetricsGenerator.GenerateNativeHistograms)); err != nil { return warnings, err diff --git a/cmd/tempo/app/overrides_validation_test.go b/cmd/tempo/app/overrides_validation_test.go index d511cbbe9f8..ffccf233d68 100644 --- a/cmd/tempo/app/overrides_validation_test.go +++ b/cmd/tempo/app/overrides_validation_test.go @@ -5,10 +5,12 @@ import ( "testing" "time" + "github.com/grafana/dskit/ring" "github.com/grafana/tempo/modules/distributor" "github.com/grafana/tempo/modules/distributor/forwarder" "github.com/grafana/tempo/modules/generator/processor" "github.com/grafana/tempo/modules/generator/validation" + "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/overrides/histograms" "github.com/grafana/tempo/modules/overrides/userconfigurable/client" @@ -50,6 +52,33 @@ func Test_runtimeOverridesValidator(t *testing.T) { expErr string expWarnings []error }{ + { + name: "ingestion.tenant_shard_size smaller than RF", + cfg: Config{ + Ingester: ingester.Config{ + LifecyclerConfig: ring.LifecyclerConfig{ + RingConfig: ring.Config{ + ReplicationFactor: 3, + }, + }, + }, + }, + overrides: overrides.Overrides{Ingestion: overrides.IngestionOverrides{TenantShardSize: 2}}, + expErr: "ingester.tenant.shard_size is lower than replication factor (2 < 3)", + }, + { + name: "ingestion.tenant_shard_size equal to RF", + cfg: Config{ + Ingester: ingester.Config{ + LifecyclerConfig: ring.LifecyclerConfig{ + RingConfig: ring.Config{ + ReplicationFactor: 3, + }, + }, + }, + }, + overrides: overrides.Overrides{Ingestion: overrides.IngestionOverrides{TenantShardSize: 3}}, + }, { name: "metrics_generator.generate_native_histograms invalid", cfg: Config{}, diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go index dd56aaf17c6..477f100e51f 100644 --- a/cmd/tempo/main.go +++ b/cmd/tempo/main.go @@ -201,6 +201,10 @@ func loadConfig() (*app.Config, bool, error) { // after loading config, let's force some values if in single binary mode // if we're in single binary mode force all rings to be propagated with the in memory store if app.IsSingleBinary(config.Target) { + config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = "inmemory" + config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor = 1 + config.Ingester.LifecyclerConfig.Addr = "127.0.0.1" + // Generator's ring config.Generator.Ring.KVStore.Store = "inmemory" config.Generator.Ring.InstanceAddr = "127.0.0.1" diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 8cf64276d62..e60477e65be 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/dskit/flagext" + ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/tempo/pkg/ingest" "github.com/grafana/tempo/modules/distributor/forwarder" @@ -41,15 +42,26 @@ type Config struct { Forwarders forwarder.ConfigList `yaml:"forwarders"` Usage usage.Config `yaml:"usage,omitempty"` - KafkaConfig ingest.KafkaConfig `yaml:"kafka_config"` + // IngesterWritePathEnabled enables the ingester write path for non-Kafka deployments. + IngesterWritePathEnabled bool `yaml:"ingester_write_path_enabled"` + // KafkaWritePathEnabled enables the Kafka write path for Kafka-based deployments. + KafkaWritePathEnabled bool `yaml:"kafka_write_path_enabled"` + KafkaConfig ingest.KafkaConfig `yaml:"kafka_config"` // Internal routing toggle set by app wiring (not user-configurable). PushSpansToKafka bool `yaml:"-"` + // disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true + // note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica + ExtendWrites bool `yaml:"extend_writes"` + // configures the distributor to indicate to the client that it should retry resource exhausted errors after the // provided duration RetryAfterOnResourceExhausted time.Duration `yaml:"retry_after_on_resource_exhausted"` + // For testing. + factory ring_client.PoolAddrFunc `yaml:"-"` + // TracePushMiddlewares are hooks called when a trace push request is received. // Middleware errors are logged but don't fail the push (fail open behavior). TracePushMiddlewares []TracePushMiddleware `yaml:"-"` @@ -79,6 +91,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.RetryAfterOnResourceExhausted = 0 cfg.OverrideRingKey = distributorRingKey + cfg.ExtendWrites = true + cfg.IngesterWritePathEnabled = false cfg.MaxAttributeBytes = 2048 // 2KB diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index df29aee5b97..143ad4681a5 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -16,6 +17,7 @@ import ( "github.com/grafana/dskit/limiter" dslog "github.com/grafana/dskit/log" "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -32,9 +34,11 @@ import ( "github.com/grafana/tempo/modules/distributor/receiver" "github.com/grafana/tempo/modules/distributor/usage" "github.com/grafana/tempo/modules/generator" + ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/dataquality" "github.com/grafana/tempo/pkg/ingest" + "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" @@ -51,6 +55,16 @@ const ( ) var ( + metricIngesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "distributor_ingester_appends_total", + Help: "The total number of batch appends sent to ingesters.", + }, []string{"ingester"}) + metricIngesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "distributor_ingester_append_failures_total", + Help: "The total number of failed batch appends sent to ingesters.", + }, []string{"ingester"}) metricGeneratorPushes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_metrics_generator_pushes_total", @@ -90,6 +104,11 @@ var ( NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }) + metricIngesterClients = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "tempo", + Name: "distributor_ingester_clients", + Help: "The current number of ingester clients.", + }) metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_attributes_truncated_total", @@ -167,8 +186,12 @@ type Distributor struct { services.Service cfg Config + clientCfg ingester_client.Config + ingestersRing ring.ReadRing + pool *ring_client.Pool DistributorRing *ring.Ring overrides overrides.Interface + traceEncoder model.SegmentDecoder // Local in-process push targets used in single-binary mode. localPushTargets LocalPushTargets @@ -208,6 +231,8 @@ type Distributor struct { // New a distributor creates. func New( cfg Config, + clientCfg ingester_client.Config, + ingestersRing ring.ReadRing, localPushTargets LocalPushTargets, partitionRing ring.PartitionRingReader, o overrides.Interface, @@ -220,6 +245,13 @@ func New( return nil, err } + factory := cfg.factory + if factory == nil { + factory = func(addr string) (ring_client.PoolClient, error) { + return ingester_client.New(addr, clientCfg) + } + } + subservices := []services.Service(nil) // Create the configured ingestion rate limit strategy (local or global). @@ -245,16 +277,29 @@ func New( ingestionRateStrategy = newLocalIngestionRateStrategy(o) } + pool := ring_client.NewPool("distributor_pool", + clientCfg.PoolConfig, + ring_client.NewRingServiceDiscovery(ingestersRing), + factory, + metricIngesterClients, + logger) + + subservices = append(subservices, pool) + pushSpansToKafka := cfg.PushSpansToKafka d := &Distributor{ cfg: cfg, + clientCfg: clientCfg, + ingestersRing: ingestersRing, + pool: pool, DistributorRing: distributorRing, ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), localPushTargets: localPushTargets, partitionRing: partitionRing, pushSpansToKafka: pushSpansToKafka, overrides: o, + traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding), tracePushMiddlewares: cfg.TracePushMiddlewares, truncationLogger: tempo_log.NewRateLimitedLogger(truncationLogsPerSecond, level.Warn(logger)), logger: logger, @@ -481,6 +526,13 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te } } + if d.cfg.IngesterWritePathEnabled { + err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, ringTokens) + if err != nil { + return nil, err + } + } + if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil { _ = level.Warn(d.logger).Log("msg", "failed to forward batches for tenant=%s: %w", userID, err) } @@ -551,6 +603,79 @@ func (d *Distributor) pushTracesToLiveStore(ctx context.Context, userID string, return nil } +func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*rebatchedTrace, keys []uint32) error { + marshalledTraces := make([][]byte, len(traces)) + for i, t := range traces { + b, err := d.traceEncoder.PrepareForWrite(t.trace, t.start, t.end) + if err != nil { + return fmt.Errorf("failed to marshal PushRequest: %w", err) + } + marshalledTraces[i] = b + } + + op := ring.WriteNoExtend + if d.cfg.ExtendWrites { + op = ring.Write + } + + numOfTraces := len(keys) + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + + var mu sync.Mutex + + writeRing := d.ingestersRing.ShuffleShard(userID, d.overrides.IngestionTenantShardSize(userID)) + + err := ring.DoBatchWithOptions(ctx, op, writeRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { + localCtx, cancel := context.WithTimeout(ctx, d.clientCfg.RemoteTimeout) + defer cancel() + localCtx = user.InjectOrgID(localCtx, userID) + + req := tempopb.PushBytesRequest{ + Traces: make([]tempopb.PreallocBytes, len(indexes)), + Ids: make([][]byte, len(indexes)), + } + + for i, j := range indexes { + req.Traces[i].Slice = marshalledTraces[j][0:] + req.Ids[i] = traces[j].id + } + + c, err := d.pool.GetClientFor(ingester.Addr) + if err != nil { + return err + } + + pushResponse, err := c.(tempopb.PusherClient).PushBytesV2(localCtx, &req) + metricIngesterAppends.WithLabelValues(ingester.Addr).Inc() + + if err != nil { // internal error, drop entire batch + metricIngesterAppendFailures.WithLabelValues(ingester.Addr).Inc() + return err + } + + mu.Lock() + defer mu.Unlock() + + d.processPushResponse(pushResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, indexes) + + return nil + }, ring.DoBatchOptions{}) + // if err != nil, we discarded everything because of an internal error (like "context cancelled") + if err != nil { + logDiscardedRebatchedSpans(traces, userID, &d.cfg.LogDiscardedSpans, d.logger) + return err + } + + // count discarded span count + mu.Lock() + defer mu.Unlock() + recordDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing, userID) + logDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing, userID, &d.cfg.LogDiscardedSpans, d.logger) + + return nil +} + func (d *Distributor) sendToGenerators(ctx context.Context, userID string, _ []uint32, traces []*rebatchedTrace, noGenerateMetrics bool) error { req := tempopb.PushSpansRequest{ Batches: nil, @@ -598,7 +723,7 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin return fmt.Errorf("failed to shuffle shard: %w", err) } return ring.DoBatchWithOptions(ctx, ring.Write, ring.NewActivePartitionBatchRing(partitionRing), keys, func(partition ring.InstanceDesc, indexes []int) error { - localCtx, cancel := context.WithTimeout(ctx, d.cfg.KafkaConfig.WriteTimeout) + localCtx, cancel := context.WithTimeout(ctx, d.clientCfg.RemoteTimeout) defer cancel() localCtx = user.InjectOrgID(localCtx, userID) @@ -653,8 +778,8 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin } // requestsByTraceID groups ResourceSpans by trace ID, producing hash-ring tokens and -// rebatched traces for downstream write-path processing. It truncates oversized attributes -// and returns the first truncation example (if any) for diagnostic logging. +// rebatched traces for the ingesters. It truncates oversized attributes and returns +// the first truncation example (if any) for diagnostic logging. func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, truncatedAttributesCount, *truncatedAttrInfo, error) { const tracesPerBatch = 20 // p50 of internal env tracesByID := make(map[uint64]*rebatchedTrace, tracesPerBatch) @@ -810,6 +935,70 @@ func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, trunca return count } +// discardedPredicate determines if a trace is discarded based on the number of successful replications. +type discardedPredicate func(int) bool + +func newDiscardedPredicate(repFactor int) discardedPredicate { + quorum := int(math.Floor(float64(repFactor)/2)) + 1 // min success required + return func(numSuccess int) bool { + return numSuccess < quorum + } +} + +func countDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) { + discarded := newDiscardedPredicate(repFactor) + + for traceIndex, numSuccess := range numSuccessByTraceIndex { + if !discarded(numSuccess) { + continue + } + spanCount := traces[traceIndex].spanCount + switch lastErrorReasonByTraceIndex[traceIndex] { + case tempopb.PushErrorReason_MAX_LIVE_TRACES: + maxLiveDiscardedCount += spanCount + case tempopb.PushErrorReason_TRACE_TOO_LARGE: + traceTooLargeDiscardedCount += spanCount + case tempopb.PushErrorReason_UNKNOWN_ERROR: + unknownErrorCount += spanCount + } + } + + return maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount +} + +func (d *Distributor) processPushResponse(pushResponse *tempopb.PushResponse, numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, numOfTraces int, indexes []int) { + // no errors + if len(pushResponse.ErrorsByTrace) == 0 { + for _, reqBatchIndex := range indexes { + if reqBatchIndex > numOfTraces { + level.Warn(d.logger).Log("msg", fmt.Sprintf("batch index %d out of bound for length %d", reqBatchIndex, numOfTraces)) + continue + } + numSuccessByTraceIndex[reqBatchIndex]++ + } + return + } + + for ringIndex, pushError := range pushResponse.ErrorsByTrace { + // translate index of ring batch and req batch + // since the request batch gets split up into smaller batches based on the indexes + // like [0,1] [1] [2] [0,2] + reqBatchIndex := indexes[ringIndex] + if reqBatchIndex > numOfTraces { + level.Warn(d.logger).Log("msg", fmt.Sprintf("batch index %d out of bound for length %d", reqBatchIndex, numOfTraces)) + continue + } + + // if no error, record number of success + if pushError == tempopb.PushErrorReason_NO_ERROR { + numSuccessByTraceIndex[reqBatchIndex]++ + continue + } + // else record last error + lastErrorReasonByTraceIndex[reqBatchIndex] = pushError + } +} + func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceivedSpansConfig) { for _, b := range batches { serviceName := "" @@ -834,6 +1023,43 @@ func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceiv } } +func recordDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, writeRing ring.ReadRing, userID string) { + maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor()) + overrides.RecordDiscardedSpans(maxLiveDiscardedCount, overrides.ReasonLiveTracesExceeded, userID) + overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, overrides.ReasonTraceTooLarge, userID) + overrides.RecordDiscardedSpans(unknownErrorCount, overrides.ReasonUnknown, userID) +} + +func logDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, writeRing ring.ReadRing, userID string, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + discarded := newDiscardedPredicate(writeRing.ReplicationFactor()) + for traceIndex, numSuccess := range numSuccessByTraceIndex { + if !discarded(numSuccess) { + continue + } + errorReason := lastErrorReasonByTraceIndex[traceIndex] + if errorReason != tempopb.PushErrorReason_NO_ERROR { + loggerWithAtts := logger + loggerWithAtts = log.With( + loggerWithAtts, + "push_error_reason", fmt.Sprintf("%v", errorReason), + ) + logDiscardedResourceSpans(traces[traceIndex].trace.ResourceSpans, userID, cfg, loggerWithAtts) + } + } +} + +func logDiscardedRebatchedSpans(batches []*rebatchedTrace, userID string, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + for _, b := range batches { + logDiscardedResourceSpans(b.trace.ResourceSpans, userID, cfg, logger) + } +} + func logDiscardedResourceSpans(batches []*v1.ResourceSpans, userID string, cfg *LogSpansConfig, logger log.Logger) { if !cfg.Enabled { return diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index a09d6231c71..0e5e91b3cbe 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -8,6 +8,7 @@ import ( "errors" "flag" "fmt" + "maps" "math/rand" "os" "strconv" @@ -18,8 +19,10 @@ import ( kitlog "github.com/go-kit/log" "github.com/gogo/status" "github.com/golang/protobuf/proto" // nolint: all //ProtoReflect + "github.com/grafana/dskit/flagext" dslog "github.com/grafana/dskit/log" "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/grafana/tempo/modules/generator" @@ -30,10 +33,13 @@ import ( "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/pdata/ptrace" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "github.com/grafana/tempo/modules/distributor/receiver" + ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/tempopb" v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" @@ -44,6 +50,13 @@ import ( "github.com/grafana/tempo/pkg/util/test" ) +const ( + numIngesters = 5 + noError = tempopb.PushErrorReason_NO_ERROR + maxLiveTraceError = tempopb.PushErrorReason_MAX_LIVE_TRACES + traceTooLargeError = tempopb.PushErrorReason_TRACE_TOO_LARGE +) + var ctx = user.InjectOrgID(context.Background(), "test") func batchesToTraces(t *testing.T, batches []*v1.ResourceSpans) ptrace.Traces { @@ -1231,7 +1244,7 @@ func TestDistributor(t *testing.T) { limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) // todo: test limits - d := prepare(t, limits, nil) + d, _ := prepare(t, limits, nil) b := test.MakeBatch(tc.lines, []byte{}) traces := batchesToTraces(t, []*v1.ResourceSpans{b}) @@ -1414,7 +1427,7 @@ func TestLogReceivedSpans(t *testing.T) { buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, limits, logger) + d, _ := prepare(t, limits, logger) d.cfg.LogReceivedSpans = LogSpansConfig{ Enabled: tc.LogReceivedSpansEnabled, FilterByStatusError: tc.filterByStatusError, @@ -1432,6 +1445,202 @@ func TestLogReceivedSpans(t *testing.T) { } } +func TestLogDiscardedSpansWhenContextCancelled(t *testing.T) { + for i, tc := range []struct { + LogDiscardedSpansEnabled bool + filterByStatusError bool + includeAllAttributes bool + batches []*v1.ResourceSpans + expectedLogsSpan []testLogSpan + }{ + { + LogDiscardedSpansEnabled: false, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + expectedLogsSpan: []testLogSpan{}, + }, + { + LogDiscardedSpansEnabled: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + Tenant: "test", + TraceID: "0a0102030405060708090a0b0c0d0e0f", + SpanID: "dad44adc9a83b370", + }, + }, + }, + { + LogDiscardedSpansEnabled: true, + includeAllAttributes: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test-service2", []*v1.ScopeSpans{ + makeScope( + makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + }, makeAttribute("resource_attribute2", "value2")), + }, + expectedLogsSpan: []testLogSpan{ + { + Name: "Test Span", + Msg: "discarded", + Level: "info", + Tenant: "test", + TraceID: "b1c792dea27d511c145df8402bdd793a", + SpanID: "56afb9fe18b6c2d6", + SpanServiceName: "test-service2", + SpanStatus: "STATUS_CODE_ERROR", + SpanKind: "SPAN_KIND_SERVER", + ResourceAttribute2: "value2", + }, + }, + }, + { + LogDiscardedSpansEnabled: true, + filterByStatusError: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test-service", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span1", nil), + makeSpan("e3210a2b38097332d1fe43083ea93d29", "6c21c48da4dbd1a7", "Test Span2", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + makeScope( + makeSpan("bb42ec04df789ff04b10ea5274491685", "1b3a296034f4031e", "Test Span3", nil)), + }), + makeResourceSpans("test-service2", []*v1.ScopeSpans{ + makeScope( + makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + }), + }, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + Tenant: "test", + TraceID: "e3210a2b38097332d1fe43083ea93d29", + SpanID: "6c21c48da4dbd1a7", + }, + { + Msg: "discarded", + Level: "info", + Tenant: "test", + TraceID: "b1c792dea27d511c145df8402bdd793a", + SpanID: "56afb9fe18b6c2d6", + }, + }, + }, + } { + t.Run(fmt.Sprintf("[%d] TestLogDiscardedSpansWhenContextCancelled LogDiscardedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogDiscardedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + limits := overrides.Config{} + limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) + + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + + d, _ := prepare(t, limits, logger) + d.cfg.LogDiscardedSpans = LogSpansConfig{ + Enabled: tc.LogDiscardedSpansEnabled, + FilterByStatusError: tc.filterByStatusError, + IncludeAllAttributes: tc.includeAllAttributes, + } + + traces := batchesToTraces(t, tc.batches) + ctx, cancelFunc := context.WithCancelCause(ctx) + cause := errors.New("test cause") + cancelFunc(cause) // cancel to force all spans to be discarded + + _, err := d.PushTraces(ctx, traces) + assert.Equal(t, cause, err) + + assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) + }) + } +} + +func TestLogDiscardedSpansWhenPushToIngesterFails(t *testing.T) { + for i, tc := range []struct { + LogDiscardedSpansEnabled bool + filterByStatusError bool + includeAllAttributes bool + batches []*v1.ResourceSpans + expectedLogsSpan []testLogSpan + pushErrorByTrace []tempopb.PushErrorReason + }{ + { + LogDiscardedSpansEnabled: false, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError}, + expectedLogsSpan: []testLogSpan{}, + }, + { + LogDiscardedSpansEnabled: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError}, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + PushErrorReason: "TRACE_TOO_LARGE", + Tenant: "test", + TraceID: "0a0102030405060708090a0b0c0d0e0f", + SpanID: "dad44adc9a83b370", + }, + }, + }, + } { + t.Run(fmt.Sprintf("[%d] TestLogDiscardedSpansWhenPushToIngesterFails LogDiscardedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogDiscardedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + limits := overrides.Config{} + limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) + + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + + d, ingesters := prepare(t, limits, logger) + d.cfg.LogDiscardedSpans = LogSpansConfig{ + Enabled: tc.LogDiscardedSpansEnabled, + FilterByStatusError: tc.filterByStatusError, + IncludeAllAttributes: tc.includeAllAttributes, + } + + // mock ingester errors + for ingester := range maps.Values(ingesters) { + ingester.pushBytesV2 = func(_ context.Context, _ *tempopb.PushBytesRequest, _ ...grpc.CallOption) (*tempopb.PushResponse, error) { + return &tempopb.PushResponse{ + ErrorsByTrace: tc.pushErrorByTrace, + }, nil + } + } + + traces := batchesToTraces(t, tc.batches) + + _, err := d.PushTraces(ctx, traces) + if err != nil { + t.Fatal(err) + } + assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) + }) + } +} + func actualLogSpan(t *testing.T, buf *bytes.Buffer) []testLogSpan { bufJSON := "[" + strings.TrimRight(strings.ReplaceAll(buf.String(), "\n", ","), ",") + "]" var actualLogsSpan []testLogSpan @@ -1455,7 +1664,7 @@ func TestRateLimitRespected(t *testing.T) { } buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) batches := []*v1.ResourceSpans{ makeResourceSpans("test-service", []*v1.ScopeSpans{ makeScope( @@ -1486,6 +1695,284 @@ func TestRateLimitRespected(t *testing.T) { assert.True(t, status.Code() == codes.ResourceExhausted, "Wrong status code") } +func TestDiscardCountReplicationFactor(t *testing.T) { + tt := []struct { + name string + pushErrorByTrace [][]tempopb.PushErrorReason + replicationFactor int + expectedLiveTracesDiscardedCount int + expectedTraceTooLargeDiscardedCount int + }{ + // trace sizes + // trace[0] = 5 spans + // trace[1] = 10 spans + // trace[2] = 15 spans + { + name: "no errors, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "no error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one mlt error, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 5, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one mlt error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one ttl error, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "one ttl error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "two mlt errors, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {maxLiveTraceError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 5, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "two ttl errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three ttl errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three mix errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, maxLiveTraceError, noError}, {noError, traceTooLargeError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three mix trace errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, traceTooLargeError}, {noError, maxLiveTraceError, traceTooLargeError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 10, + expectedTraceTooLargeDiscardedCount: 15, + }, + { + name: "one ttl error rep factor 5 min (3) response", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 5, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "one error rep factor 5 with 4 responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 5, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "replication factor 1", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}}, + replicationFactor: 1, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + traceByID := make([]*rebatchedTrace, 3) + // batch with 3 traces + traceByID[0] = &rebatchedTrace{ + spanCount: 5, + } + + traceByID[1] = &rebatchedTrace{ + spanCount: 15, + } + + traceByID[2] = &rebatchedTrace{ + spanCount: 10, + } + + keys := []int{0, 2, 1} + + numSuccessByTraceIndex := make([]int, len(traceByID)) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, len(traceByID)) + + for _, ErrorByTrace := range tc.pushErrorByTrace { + for ringIndex, err := range ErrorByTrace { + // translate + traceIndex := keys[ringIndex] + + currentNumSuccess := numSuccessByTraceIndex[traceIndex] + if err == tempopb.PushErrorReason_NO_ERROR { + numSuccessByTraceIndex[traceIndex] = currentNumSuccess + 1 + } else { + lastErrorReasonByTraceIndex[traceIndex] = err + } + } + } + + liveTraceDiscardedCount, traceTooLongDiscardedCount, _ := countDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traceByID, tc.replicationFactor) + + require.Equal(t, tc.expectedLiveTracesDiscardedCount, liveTraceDiscardedCount) + require.Equal(t, tc.expectedTraceTooLargeDiscardedCount, traceTooLongDiscardedCount) + }) + } +} + +func TestProcessIngesterPushByteResponse(t *testing.T) { + // batch has 5 traces [0, 1, 2, 3, 4, 5] + numOfTraces := 5 + tt := []struct { + name string + pushErrorByTrace []tempopb.PushErrorReason + indexes []int + expectedSuccessIndex []int + expectedLastErrorIndex []tempopb.PushErrorReason + }{ + { + name: "explicit no errors, first three traces", + pushErrorByTrace: []tempopb.PushErrorReason{noError, noError, noError}, + indexes: []int{0, 1, 2}, + expectedSuccessIndex: []int{1, 1, 1, 0, 0}, + expectedLastErrorIndex: make([]tempopb.PushErrorReason, numOfTraces), + }, + { + name: "no errors, no ErrorsByTrace value", + pushErrorByTrace: []tempopb.PushErrorReason{}, + indexes: []int{1, 2, 3}, + expectedSuccessIndex: []int{0, 1, 1, 1, 0}, + expectedLastErrorIndex: make([]tempopb.PushErrorReason, numOfTraces), + }, + { + name: "all errors, first three traces", + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError, traceTooLargeError, traceTooLargeError}, + indexes: []int{0, 1, 2}, + expectedSuccessIndex: []int{0, 0, 0, 0, 0}, + expectedLastErrorIndex: []tempopb.PushErrorReason{traceTooLargeError, traceTooLargeError, traceTooLargeError, noError, noError}, + }, + { + name: "random errors, random three traces", + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError, maxLiveTraceError, noError}, + indexes: []int{0, 2, 4}, + expectedSuccessIndex: []int{0, 0, 0, 0, 1}, + expectedLastErrorIndex: []tempopb.PushErrorReason{traceTooLargeError, noError, maxLiveTraceError, noError, noError}, + }, + } + + // prepare test data + overridesConfig := overrides.Config{} + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + d, _ := prepare(t, overridesConfig, logger) + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + pushByteResponse := &tempopb.PushResponse{ + ErrorsByTrace: tc.pushErrorByTrace, + } + d.processPushResponse(pushByteResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, tc.indexes) + assert.Equal(t, numSuccessByTraceIndex, tc.expectedSuccessIndex) + assert.Equal(t, lastErrorReasonByTraceIndex, tc.expectedLastErrorIndex) + }) + } +} + +func TestIngesterPushBytes(t *testing.T) { + // prepare test data + overridesConfig := overrides.Config{} + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + d, _ := prepare(t, overridesConfig, logger) + + traces := []*rebatchedTrace{ + { + spanCount: 1, + }, + { + spanCount: 5, + }, + { + spanCount: 10, + }, + { + spanCount: 15, + }, + { + spanCount: 20, + }, + } + numOfTraces := len(traces) + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + + // 0 = trace_too_large, trace_too_large || discard count: 1 + // 1 = no error, trace_too_large || discard count: 5 + // 2 = no error, no error || discard count: 0 + // 3 = max_live, max_live || discard count: 15 + // 4 = trace_too_large, max_live || discard count: 20 + // total ttl: 6, mlt: 35 + + batches := [][]int{ + {0, 1, 2}, + {1, 3}, + {0, 2}, + {3, 4}, + {4}, + } + + errorsByTraces := [][]tempopb.PushErrorReason{ + {traceTooLargeError, noError, noError}, + {traceTooLargeError, maxLiveTraceError}, + {traceTooLargeError, noError}, + {maxLiveTraceError, traceTooLargeError}, + {maxLiveTraceError}, + } + + for i, indexes := range batches { + pushResponse := &tempopb.PushResponse{ + ErrorsByTrace: errorsByTraces[i], + } + d.processPushResponse(pushResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, indexes) + } + + maxLiveDiscardedCount, traceTooLargeDiscardedCount, _ := countDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, 3) + assert.Equal(t, traceTooLargeDiscardedCount, 6) + assert.Equal(t, maxLiveDiscardedCount, 35) +} + func TestPushTracesSkipMetricsGenerationIngestStorage(t *testing.T) { const topic = "test-topic" @@ -1496,7 +1983,8 @@ func TestPushTracesSkipMetricsGenerationIngestStorage(t *testing.T) { limitCfg := overrides.Config{} limitCfg.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) - distributorCfg, overridesSvc, limits, middleware := setupDependencies(t, limitCfg) + distributorCfg, ingesterClientCfg, overridesSvc, _, + ingesterRing, limits, middleware := setupDependencies(t, limitCfg) distributorCfg.PushSpansToKafka = true distributorCfg.KafkaConfig = ingest.KafkaConfig{} @@ -1506,6 +1994,8 @@ func TestPushTracesSkipMetricsGenerationIngestStorage(t *testing.T) { d, err := New( distributorCfg, + ingesterClientCfg, + ingesterRing, LocalPushTargets{}, singlePartitionRingReader{}, overridesSvc, @@ -1583,7 +2073,7 @@ func TestPushTracesToLocalLiveStore(t *testing.T) { limitCfg := &flag.FlagSet{} limits.RegisterFlagsAndApplyDefaults(limitCfg) - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, clientCfg, overridesSvc, _, ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) var ( called bool @@ -1593,6 +2083,8 @@ func TestPushTracesToLocalLiveStore(t *testing.T) { d, err := New( distributorCfg, + clientCfg, + ingesterRing, LocalPushTargets{ LiveStore: func(ctx context.Context, req *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { called = true @@ -1632,10 +2124,12 @@ func TestPushTracesToLocalLiveStoreError(t *testing.T) { limitCfg := &flag.FlagSet{} limits.RegisterFlagsAndApplyDefaults(limitCfg) - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, clientCfg, overridesSvc, _, ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) d, err := New( distributorCfg, + clientCfg, + ingesterRing, LocalPushTargets{ LiveStore: func(_ context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { return nil, errors.New("boom") @@ -1662,12 +2156,14 @@ func TestPushLocalSkipsGeneratorWhenLiveStoreFails(t *testing.T) { limits.RegisterFlagsAndApplyDefaults(limitCfg) limits.Defaults.MetricsGenerator.Processors = listtomap.ListToMap{"service-graphs": {}} - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, clientCfg, overridesSvc, _, ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) generatorCalled := make(chan struct{}, 1) d, err := New( distributorCfg, + clientCfg, + ingesterRing, LocalPushTargets{ Generator: func(_ context.Context, _ *tempopb.PushSpansRequest) (*tempopb.PushResponse, error) { generatorCalled <- struct{}{} @@ -1715,7 +2211,7 @@ func TestPushTracesKafkaThenLocalLiveStoreFailure(t *testing.T) { limitCfg := &flag.FlagSet{} limits.RegisterFlagsAndApplyDefaults(limitCfg) - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, clientCfg, overridesSvc, _, ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) distributorCfg.PushSpansToKafka = true distributorCfg.KafkaConfig = ingest.KafkaConfig{} distributorCfg.KafkaConfig.RegisterFlags(&flag.FlagSet{}) @@ -1724,6 +2220,8 @@ func TestPushTracesKafkaThenLocalLiveStoreFailure(t *testing.T) { d, err := New( distributorCfg, + clientCfg, + ingesterRing, LocalPushTargets{ LiveStore: func(_ context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { return nil, errors.New("local live-store failure") @@ -1772,7 +2270,7 @@ func TestArtificialLatency(t *testing.T) { latency := 50 * time.Millisecond buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) d.cfg.ArtificialDelay = latency batches := []*v1.ResourceSpans{ @@ -1803,7 +2301,7 @@ func TestArtificialLatencyIsAppliedOnError(t *testing.T) { latency := 50 * time.Millisecond buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) d.cfg.ArtificialDelay = latency batches := []*v1.ResourceSpans{ @@ -1900,37 +2398,195 @@ func makeResourceSpans(serviceName string, ils []*v1.ScopeSpans, attributes ...* return rs } -func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) *Distributor { +func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Distributor, map[string]*mockIngester) { if logger == nil { logger = kitlog.NewNopLogger() } - distributorConfig, overridesSvc, l, mw := setupDependencies(t, limits) - d, err := New(distributorConfig, LocalPushTargets{}, nil, overridesSvc, mw, logger, l, prometheus.NewPedanticRegistry()) + distributorConfig, clientConfig, overrides, ingesters, ingestersRing, l, mw := setupDependencies(t, limits) + d, err := New(distributorConfig, clientConfig, ingestersRing, LocalPushTargets{}, nil, overrides, mw, logger, l, prometheus.NewPedanticRegistry()) require.NoError(t, err) - return d + return d, ingesters } -func setupDependencies(t *testing.T, limits overrides.Config) (Config, overrides.Service, dslog.Level, receiver.Middleware) { +func setupDependencies(t *testing.T, limits overrides.Config) (Config, ingester_client.Config, overrides.Service, map[string]*mockIngester, *mockRing, dslog.Level, receiver.Middleware) { t.Helper() - var distributorConfig Config + var ( + distributorConfig Config + clientConfig ingester_client.Config + ) + flagext.DefaultValues(&clientConfig) - overridesSvc, err := overrides.NewOverrides(limits, nil, prometheus.DefaultRegisterer) + overrides, err := overrides.NewOverrides(limits, nil, prometheus.DefaultRegisterer) require.NoError(t, err) + // Mock the ingesters ring + ingesters := map[string]*mockIngester{} + for i := 0; i < numIngesters; i++ { + ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{ + pushBytes: pushBytesNoOp, + pushBytesV2: pushBytesNoOp, + } + } + + ingestersRing := &mockRing{ + replicationFactor: 3, + } + for addr := range ingesters { + ingestersRing.ingesters = append(ingestersRing.ingesters, ring.InstanceDesc{ + Addr: addr, + }) + } + distributorConfig.MaxAttributeBytes = 1000 distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) distributorConfig.DistributorRing.KVStore.Mock = nil distributorConfig.DistributorRing.InstanceInterfaceNames = []string{"eth0", "en0", "lo0"} + distributorConfig.IngesterWritePathEnabled = true + distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) { + return ingesters[addr], nil + } l := dslog.Level{} _ = l.Set("error") mw := receiver.MultiTenancyMiddleware() - return distributorConfig, overridesSvc, l, mw + return distributorConfig, clientConfig, overrides, ingesters, ingestersRing, l, mw +} + +type mockIngester struct { + grpc_health_v1.HealthClient + // pushBytes mock to be overridden in test scenarios if needed + pushBytes func(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) + // pushBytesV2 mock to be overridden in test scenarios if needed + pushBytesV2 func(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) +} + +func pushBytesNoOp(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { + return &tempopb.PushResponse{}, nil +} + +var _ tempopb.PusherClient = (*mockIngester)(nil) + +func (i *mockIngester) PushBytes(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) { + return i.pushBytes(ctx, in, opts...) +} + +func (i *mockIngester) PushBytesV2(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) { + return i.pushBytesV2(ctx, in, opts...) +} + +func (i *mockIngester) Close() error { + return nil +} + +// Copied from Cortex; TODO(twilkie) - factor this our and share it. +// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor +// ingesters. +type mockRing struct { + prometheus.Counter + ingesters []ring.InstanceDesc + replicationFactor uint32 +} + +func (r mockRing) GetSubringForOperationStates(_ ring.Operation) ring.ReadRing { + panic("implement me if required for testing") +} + +func (r mockRing) WritableInstancesWithTokensCount() int { + panic("implement me if required for testing") +} + +func (r mockRing) WritableInstancesWithTokensInZoneCount(string) int { + panic("implement me if required for testing") +} + +var _ ring.ReadRing = (*mockRing)(nil) + +func (r mockRing) Get(key uint32, _ ring.Operation, buf []ring.InstanceDesc, _, _ []string) (ring.ReplicationSet, error) { + result := ring.ReplicationSet{ + MaxErrors: 1, + Instances: buf[:0], + } + for i := uint32(0); i < r.replicationFactor; i++ { + n := (key + i) % uint32(len(r.ingesters)) + result.Instances = append(result.Instances, r.ingesters[n]) + } + return result, nil +} + +func (r mockRing) GetWithOptions(key uint32, _ ring.Operation, _ ...ring.Option) (ring.ReplicationSet, error) { + buf := make([]ring.InstanceDesc, 0) + result := ring.ReplicationSet{ + MaxErrors: 1, + Instances: buf, + } + for i := uint32(0); i < r.replicationFactor; i++ { + n := (key + i) % uint32(len(r.ingesters)) + result.Instances = append(result.Instances, r.ingesters[n]) + } + return result, nil +} + +func (r mockRing) GetAllHealthy(ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Instances: r.ingesters, + MaxErrors: 1, + }, nil +} + +func (r mockRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + return r.GetAllHealthy(op) +} + +func (r mockRing) ReplicationFactor() int { + return int(r.replicationFactor) +} + +func (r mockRing) ShuffleShard(string, int) ring.ReadRing { + return r +} + +func (r mockRing) ShuffleShardWithLookback(string, int, time.Duration, time.Time) ring.ReadRing { + return r +} + +func (r mockRing) GetTokenRangesForInstance(_ string) (ring.TokenRanges, error) { + return nil, nil +} + +func (r mockRing) InstancesCount() int { + return len(r.ingesters) +} + +func (r mockRing) InstancesWithTokensCount() int { + return len(r.ingesters) +} + +func (r mockRing) HasInstance(string) bool { + return true +} + +func (r mockRing) CleanupShuffleShardCache(string) { +} + +func (r mockRing) GetInstanceState(string) (ring.InstanceState, error) { + return ring.ACTIVE, nil +} + +func (r mockRing) InstancesInZoneCount(string) int { + return 0 +} + +func (r mockRing) InstancesWithTokensInZoneCount(_ string) int { + return 0 +} + +func (r mockRing) ZonesCount() int { + return 0 } type singlePartitionRingReader struct{} @@ -2025,7 +2681,7 @@ func TestCheckForRateLimits(t *testing.T) { // Create a distributor with the overrides logger := kitlog.NewNopLogger() - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) // check if we can ingest the batch err := d.checkForRateLimits(tc.tracesSize, 100, "test-user") @@ -2154,7 +2810,7 @@ func TestRetryInfoEnabled(t *testing.T) { }, } - d := prepare(t, limits, nil) + d, _ := prepare(t, limits, nil) d.cfg.RetryAfterOnResourceExhausted = tt.retryAfterOnResourceExhausted result, err := d.RetryInfoEnabled(tt.ctx) @@ -2174,7 +2830,8 @@ func TestTracePushMiddlewareCalled(t *testing.T) { limitCfg := &flag.FlagSet{} limits.RegisterFlagsAndApplyDefaults(limitCfg) - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, ingesterClientCfg, overridesSvc, ingesters, + ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) // Track middleware calls var middlewareCalled bool @@ -2191,6 +2848,8 @@ func TestTracePushMiddlewareCalled(t *testing.T) { d, err := New( distributorCfg, + ingesterClientCfg, + ingesterRing, LocalPushTargets{}, nil, overridesSvc, @@ -2201,6 +2860,12 @@ func TestTracePushMiddlewareCalled(t *testing.T) { ) require.NoError(t, err) + // Ensure all ingesters are ready + for _, ingester := range ingesters { + ingester.pushBytes = pushBytesNoOp + ingester.pushBytesV2 = pushBytesNoOp + } + // Create test traces traces := batchesToTraces(t, []*v1.ResourceSpans{test.MakeBatch(10, nil)}) @@ -2219,7 +2884,8 @@ func TestTracePushMiddlewareFailsOpen(t *testing.T) { limitCfg := &flag.FlagSet{} limits.RegisterFlagsAndApplyDefaults(limitCfg) - distributorCfg, overridesSvc, loggingLevel, middleware := setupDependencies(t, limits) + distributorCfg, ingesterClientCfg, overridesSvc, _, + ingesterRing, loggingLevel, middleware := setupDependencies(t, limits) // Create a middleware that returns an error expectedErr := errors.New("middleware error") @@ -2231,6 +2897,8 @@ func TestTracePushMiddlewareFailsOpen(t *testing.T) { d, err := New( distributorCfg, + ingesterClientCfg, + ingesterRing, LocalPushTargets{}, nil, overridesSvc, diff --git a/modules/distributor/receiver/shim.go b/modules/distributor/receiver/shim.go index a1a58cde959..6836e25f8c3 100644 --- a/modules/distributor/receiver/shim.go +++ b/modules/distributor/receiver/shim.go @@ -51,7 +51,7 @@ var ( metricPushDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "tempo", Name: "distributor_push_duration_seconds", - Help: "Records the amount of time to process and route a batch through the distributor.", + Help: "Records the amount of time to push a batch to the ingester.", Buckets: prometheus.DefBuckets, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 0e9aab6c109..b0384263173 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "sort" + "sync" "github.com/go-kit/log/level" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" @@ -56,6 +57,7 @@ type Querier struct { cfg Config + liveStoreRing ring.ReadRing liveStorePool *ring_client.Pool partitionRing *ring.PartitionInstanceRing @@ -89,6 +91,7 @@ func New( q := &Querier{ cfg: cfg, + liveStoreRing: liveStoreRing, partitionRing: partitionRing, liveStorePool: ring_client.NewPool("querier_to_livestore_pool", liveStoreClientConfig.PoolConfig, @@ -321,8 +324,17 @@ func (q *Querier) forLiveStoreRing(ctx context.Context, f forEachFn) ([]any, err ctx, span := tracer.Start(ctx, "Querier.forLiveStoreRing") defer span.End() + // If partition ring is not configured, use the regular ring directly (kafkaless mode) if q.partitionRing == nil { - return nil, errors.New("forLiveStoreRing: partition ring is not configured") + if q.liveStoreRing == nil { + return nil, errors.New("forLiveStoreRing: neither partition ring nor live store ring is configured") + } + // Get all healthy instances from the ring + rs, err := q.liveStoreRing.GetReplicationSetForOperation(ring.Read) + if err != nil { + return nil, fmt.Errorf("error finding ring replicas: %w", err) + } + return forReplicationSet(ctx, q, rs, f) } rs, err := q.partitionRing.GetReplicationSetsForOperation(ring.Read) @@ -342,9 +354,19 @@ func (q *Querier) forLiveStoreMetricsRing(ctx context.Context, f forEachMetricsF ctx, span := tracer.Start(ctx, "Querier.forLiveStoreMetricsRing") defer span.End() + // If partition ring is not configured, use the regular ring directly (kafkaless mode) if q.partitionRing == nil { - return nil, errors.New("forLiveStoreMetricsRing: partition ring is not configured") + if q.liveStoreRing == nil { + return nil, errors.New("forLiveStoreMetricsRing: neither partition ring nor live store ring is configured") + } + // Get all healthy instances from the ring + rs, err := q.liveStoreRing.GetReplicationSetForOperation(ring.Read) + if err != nil { + return nil, fmt.Errorf("error finding ring replicas: %w", err) + } + return forReplicationSet(ctx, q, rs, f) } + rs, err := q.partitionRing.GetReplicationSetsForOperation(ring.Read) if err != nil { return nil, fmt.Errorf("error finding partition ring replicas: %w", err) @@ -352,6 +374,49 @@ func (q *Querier) forLiveStoreMetricsRing(ctx context.Context, f forEachMetricsF return forPartitionRingReplicaSets(ctx, q, rs, f) } +// forReplicationSet runs f for all instances in a single replication set (kafkaless mode). +func forReplicationSet[R any, TClient any](ctx context.Context, q *Querier, rs ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) { + var results []R + var mu sync.Mutex + var wg sync.WaitGroup + var firstErr error + + for _, instance := range rs.Instances { + instance := instance + wg.Add(1) + go func() { + defer wg.Done() + client, err := q.liveStorePool.GetClientForInstance(instance) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + result, err := f(ctx, client.(TClient)) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + mu.Lock() + results = append(results, result) + mu.Unlock() + }() + } + wg.Wait() + + if firstErr != nil && len(results) == 0 { + return nil, firstErr + } + return results, nil +} + func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest) (*tempopb.SearchResponse, error) { if _, err := validation.ExtractValidTenantID(ctx); err != nil { return nil, fmt.Errorf("error extracting org id in Querier.SearchRecent: %w", err)