Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Improve TraceQL perf by reverting EqualRowNumber to an inlineable function.[#4705](https://github.com/grafana/tempo/pull/4705) (@joe-elliott)
* [ENHANCEMENT] rhythm: fair partition consumption in blockbuilders[#4655](https://github.com/grafana/tempo/pull/4655) (@javiermolinar)
* [ENHANCEMENT] TraceQL: add support for querying by parent span id [#4692](https://github.com/grafana/tempo/pull/4692) (@ie-pham)
* [ENHANCEMENT] metrics-generator: allow skipping localblocks and consuming from a different source of data [#4686](https://github.com/grafana/tempo/pull/4686) (@flxbk)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott)
Correctly copy exemplars for metrics like `| rate()` when gRPC streaming.
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type App struct {
readRings map[string]*ring.Ring
partitionRing *ring.PartitionInstanceRing
partitionRingWatcher *ring.PartitionRingWatcher
generatorRingWatcher *ring.PartitionRingWatcher
Overrides overrides.Service
distributor *distributor.Distributor
querier *querier.Querier
Expand Down
87 changes: 73 additions & 14 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ const (
SecondaryIngesterRing string = "secondary-ring"
MetricsGeneratorRing string = "metrics-generator-ring"
PartitionRing string = "partition-ring"
GeneratorRingWatcher string = "generator-ring-watcher"

// individual targets
Distributor string = "distributor"
Ingester string = "ingester"
MetricsGenerator string = "metrics-generator"
Querier string = "querier"
QueryFrontend string = "query-frontend"
Compactor string = "compactor"
BlockBuilder string = "block-builder"
Distributor string = "distributor"
Ingester string = "ingester"
MetricsGenerator string = "metrics-generator"
MetricsGeneratorNoLocalBlocks string = "metrics-generator-no-local-blocks"
Querier string = "querier"
QueryFrontend string = "query-frontend"
Compactor string = "compactor"
BlockBuilder string = "block-builder"

// composite targets
SingleBinary string = "all"
Expand Down Expand Up @@ -328,6 +330,59 @@ func (t *App) initGenerator() (services.Service, error) {
return t.generator, nil
}

func (t *App) initGeneratorNoLocalBlocks() (services.Service, error) {
reg := prometheus.DefaultRegisterer

t.cfg.Generator.Ingest = t.cfg.Ingest

// In this mode, the generator runs as a stateless queue consumer that reads from
// Kafka and remote writes to a Prometheus-compatible metrics store.
if !t.cfg.Ingest.Enabled {
return nil, errors.New("ingest storage must be enabled to run metrics generator in this mode")
}
// The localblocks processor is disabled in this mode.
t.cfg.Generator.DisableLocalBlocks = true
// The store is used only by the localblocks processor. We don't need it when
// running with that processor disabled so we keep the default zero value.
var store tempo_storage.Store
// In this mode, the generator does not need to become available to serve
// queries, so we can skip setting up a gRPC server.
t.cfg.Generator.DisableGRPC = true

var err error
t.generator, err = generator.New(&t.cfg.Generator, t.Overrides, reg, t.generatorRingWatcher, store, log.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create metrics-generator: %w", err)
}

return t.generator, nil
}

func (t *App) initGeneratorRingWatcher() (services.Service, error) {
reg := prometheus.DefaultRegisterer

kvRegisterer := kv.RegistererWithKVName(reg, t.cfg.Generator.OverrideRingKey+"-watcher")
kvClient, err := kv.NewClient(t.cfg.Generator.Ring.KVStore, ring.GetPartitionRingCodec(), kvRegisterer, util_log.Logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for generator partition ring watcher: %w", err)
}

t.generatorRingWatcher = ring.NewPartitionRingWatcher(
t.cfg.Generator.OverrideRingKey,
t.cfg.Generator.OverrideRingKey,
kvClient,
util_log.Logger,
prometheus.WrapRegistererWithPrefix("tempo_", reg),
)

// Expose a web page to view the partition ring state.
editor := ring.NewPartitionRingEditor(t.cfg.Generator.OverrideRingKey, kvClient)
t.Server.HTTPRouter().Path("/partition/ring").Methods("GET", "POST").
Handler(ring.NewPartitionRingPageHandler(t.generatorRingWatcher, editor))

return t.generatorRingWatcher, nil
}

func (t *App) initBlockBuilder() (services.Service, error) {
if !t.cfg.Ingest.Enabled {
return services.NewIdleService(nil, nil), nil
Expand Down Expand Up @@ -638,6 +693,7 @@ func (t *App) setupModuleManager() error {
mm.RegisterModule(CacheProvider, t.initCacheProvider, modules.UserInvisibleModule)
mm.RegisterModule(IngesterRing, t.initIngesterRing, modules.UserInvisibleModule)
mm.RegisterModule(MetricsGeneratorRing, t.initGeneratorRing, modules.UserInvisibleModule)
mm.RegisterModule(GeneratorRingWatcher, t.initGeneratorRingWatcher, modules.UserInvisibleModule)
mm.RegisterModule(SecondaryIngesterRing, t.initSecondaryIngesterRing, modules.UserInvisibleModule)
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)

Expand All @@ -649,6 +705,7 @@ func (t *App) setupModuleManager() error {
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(MetricsGenerator, t.initGenerator)
mm.RegisterModule(MetricsGeneratorNoLocalBlocks, t.initGeneratorNoLocalBlocks)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)

mm.RegisterModule(SingleBinary, nil)
Expand All @@ -667,17 +724,19 @@ func (t *App) setupModuleManager() error {
SecondaryIngesterRing: {Server, MemberlistKV},
MetricsGeneratorRing: {Server, MemberlistKV},
PartitionRing: {MemberlistKV, Server, IngesterRing},
GeneratorRingWatcher: {MemberlistKV},

Common: {UsageReport, Server, Overrides},

// individual targets
QueryFrontend: {Common, Store, OverridesAPI},
Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing},
Ingester: {Common, Store, MemberlistKV, PartitionRing},
MetricsGenerator: {Common, OptionalStore, MemberlistKV, PartitionRing},
Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
Compactor: {Common, Store, MemberlistKV},
BlockBuilder: {Common, Store, MemberlistKV, PartitionRing},
QueryFrontend: {Common, Store, OverridesAPI},
Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing},
Ingester: {Common, Store, MemberlistKV, PartitionRing},
MetricsGenerator: {Common, OptionalStore, MemberlistKV, PartitionRing},
MetricsGeneratorNoLocalBlocks: {Common, GeneratorRingWatcher},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependence between "no local blocks" and the partition ring watcher don't make sense to me. Shouldn't this be something like MetricsGeneratorPartitionRing

Copy link
Copy Markdown
Contributor Author

@flxbk flxbk Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a different PR comment, @mdisibio suggested we could use a separate target for this metrics generator mode. That is what I did here, however that new mode still needs a way to watch a (configurable) partition ring, hence the dependency on GeneratorRingWatcher.

Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
Compactor: {Common, Store, MemberlistKV},
BlockBuilder: {Common, Store, MemberlistKV, PartitionRing},

// composite targets
SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator, BlockBuilder},
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ metrics_generator:
metrics_ingestion_time_range_slack: 30s
query_timeout: 30s
override_ring_key: metrics-generator
codec: push-bytes
disable_local_blocks: false
disable_grpc: false
ingest_concurrency: 16
instance_id: hostname
ingest:
Expand Down
21 changes: 21 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os"
"slices"
"time"

"github.com/grafana/tempo/modules/generator/processor/localblocks"
Expand All @@ -25,8 +26,15 @@ const (
ringNameForServer = "metrics-generator"

ConsumerGroup = "metrics-generator"

// codecPushBytes refers to the codec used for decoding tempopb.PushBytesRequest
codecPushBytes = "push-bytes"
// codecOTLP refers to the codec used for decoding ptrace.Traces
codecOTLP = "otlp"
)

var validCodecs = []string{codecPushBytes, codecOTLP}

// Config for a generator.
type Config struct {
Ring RingConfig `yaml:"ring"`
Expand All @@ -41,6 +49,14 @@ type Config struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`

// Codec controls which decoder to use for data consumed from Kafka.
Codec string `yaml:"codec"`
// DisableLocalBlocks controls whether the local blocks processor should be run.
// When this flag is enabled, the processor is never instantiated.
DisableLocalBlocks bool `yaml:"disable_local_blocks"`
// DisableGRPC controls whether to run a gRPC server with the metrics generator endpoints.
DisableGRPC bool `yaml:"disable_grpc"`

// This config is dynamically injected because defined outside the generator config.
Ingest ingest.Config `yaml:"-"`
IngestConcurrency uint `yaml:"ingest_concurrency"`
Expand All @@ -64,6 +80,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.MetricsIngestionSlack = 30 * time.Second
cfg.QueryTimeout = 30 * time.Second
cfg.OverrideRingKey = generatorRingKey
cfg.Codec = codecPushBytes

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -98,6 +115,10 @@ func (cfg *Config) Validate() error {
}
}

if !slices.Contains(validCodecs, cfg.Codec) {
return fmt.Errorf("invalid codec: %s, valid choices are %s", cfg.Codec, validCodecs)
}

return nil
}

Expand Down
72 changes: 41 additions & 31 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,31 +107,33 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist
logger: logger,
}

// Lifecycler and ring
ringStore, err := kv.NewClient(
cfg.Ring.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("tempo_", reg), "metrics-generator"),
g.logger,
)
if err != nil {
return nil, fmt.Errorf("create KV store client: %w", err)
}
if !cfg.DisableGRPC {
// Lifecycler and ring
ringStore, err := kv.NewClient(
cfg.Ring.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("tempo_", reg), "metrics-generator"),
g.logger,
)
if err != nil {
return nil, fmt.Errorf("create KV store client: %w", err)
}

lifecyclerCfg, err := cfg.Ring.toLifecyclerConfig()
if err != nil {
return nil, fmt.Errorf("invalid ring lifecycler config: %w", err)
}
lifecyclerCfg, err := cfg.Ring.toLifecyclerConfig()
if err != nil {
return nil, fmt.Errorf("invalid ring lifecycler config: %w", err)
}

// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(g)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, g.logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, g.logger)
// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(g)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, g.logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, g.logger)

g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, cfg.OverrideRingKey, ringStore, delegate, g.logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
if err != nil {
return nil, fmt.Errorf("create ring lifecycler: %w", err)
g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, cfg.OverrideRingKey, ringStore, delegate, g.logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
if err != nil {
return nil, fmt.Errorf("create ring lifecycler: %w", err)
}
}

g.Service = services.NewBasicService(g.starting, g.running, g.stopping)
Expand All @@ -152,16 +154,18 @@ func (g *Generator) starting(ctx context.Context) (err error) {
}
}()

g.subservices, err = services.NewManager(g.ringLifecycler)
if err != nil {
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
}
g.subservicesWatcher = services.NewFailureWatcher()
g.subservicesWatcher.WatchManager(g.subservices)
if !g.cfg.DisableGRPC {
g.subservices, err = services.NewManager(g.ringLifecycler)
if err != nil {
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
}
g.subservicesWatcher = services.NewFailureWatcher()
g.subservicesWatcher.WatchManager(g.subservices)

err = services.StartManagerAndAwaitHealthy(ctx, g.subservices)
if err != nil {
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
err = services.StartManagerAndAwaitHealthy(ctx, g.subservices)
if err != nil {
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
}
}

if g.cfg.Ingest.Enabled {
Expand Down Expand Up @@ -370,6 +374,12 @@ func (g *Generator) createInstance(id string) (*instance, error) {
}

func (g *Generator) CheckReady(_ context.Context) error {
// Always mark as ready when running without a ring, because the readiness logic
// below depends on the ring lifecycler.
if g.ringLifecycler == nil {
return nil
}

if !g.ringLifecycler.IsRegistered() {
return fmt.Errorf("metrics-generator check ready failed: not registered in the ring")
}
Expand Down
21 changes: 12 additions & 9 deletions modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/go-kit/log/level"

"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -107,7 +108,14 @@ func (g *Generator) readKafka(ctx context.Context) error {
// to multiple goroutines.
func (g *Generator) readCh(ctx context.Context) {
defer g.kafkaWG.Done()
d := ingest.NewDecoder()

var c ingest.GeneratorCodec
switch g.cfg.Codec {
case codecPushBytes:
c = ingest.NewPushBytesDecoder()
case codecOTLP:
c = ingest.NewOTLPDecoder()
}

for {
var r *kgo.Record
Expand All @@ -125,26 +133,21 @@ func (g *Generator) readCh(ctx context.Context) {
continue
}

d.Reset()
req, err := d.Decode(r.Value)
iterator, err := c.Decode(r.Value)
if err != nil {
level.Error(g.logger).Log("msg", "consumeKafkaChannel decode", "err", err)
continue
}

for _, tr := range req.Traces {
trace := &tempopb.Trace{}
err = trace.Unmarshal(tr.Slice)
for resourceSpans, err := range iterator {
if err != nil {
level.Error(g.logger).Log("msg", "consumeKafkaChannel unmarshal", "err", err)
continue
}

i.pushSpansFromQueue(ctx, r.Timestamp, &tempopb.PushSpansRequest{
Batches: trace.ResourceSpans,
Batches: resourceSpans,
})

tempopb.ReuseByteSlices([][]byte{tr.Slice})
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (i *instance) updateSubprocessors(desiredProcessors map[string]struct{}, de
}

func (i *instance) updateProcessors() error {
desiredProcessors := i.overrides.MetricsGeneratorProcessors(i.instanceID)
desiredProcessors := i.filterDisabledProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
desiredCfg, err := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID)
if err != nil {
return err
Expand Down Expand Up @@ -253,6 +253,21 @@ func (i *instance) updateProcessors() error {
return nil
}

// filterDisabledProcessors removes processors that should never be instantiated
// according to the generator's configuration from the given set of processors.
func (i *instance) filterDisabledProcessors(processors map[string]struct{}) map[string]struct{} {
// If no processors are disabled, do not apply any filtering.
if !i.cfg.DisableLocalBlocks {
return processors
}

// Otherwise, do not instantiate the localblocks processor.
filteredProcessors := maps.Clone(processors)
delete(filteredProcessors, localblocks.Name)

return filteredProcessors
}

// diffProcessors compares the existing processors with the desired processors and config.
// Must be called under a read lock.
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}, desiredCfg ProcessorConfig) (toAdd, toRemove, toReplace []string, err error) {
Expand Down
Loading