Skip to content

Commit 9064e84

Browse files
authored
[Metrics Generator] Allow running on a different source of data (#4686)
* [Metrics Generator] Allow running on a different source of data * fix lint errors * Address review feedback * regenerate manifest * do not start ring lifecycler * make partition ring watcher a separate module * do not join ring when gRPC is disabled * changelog
1 parent 122be75 commit 9064e84

10 files changed

Lines changed: 280 additions & 57 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
4040
* [ENHANCEMENT] Improve TraceQL perf by reverting EqualRowNumber to an inlineable function.[#4705](https://github.com/grafana/tempo/pull/4705) (@joe-elliott)
4141
* [ENHANCEMENT] rhythm: fair partition consumption in blockbuilders[#4655](https://github.com/grafana/tempo/pull/4655) (@javiermolinar)
4242
* [ENHANCEMENT] TraceQL: add support for querying by parent span id [#4692](https://github.com/grafana/tempo/pull/4692) (@ie-pham)
43+
* [ENHANCEMENT] metrics-generator: allow skipping localblocks and consuming from a different source of data [#4686](https://github.com/grafana/tempo/pull/4686) (@flxbk)
4344
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott)
4445
Correctly copy exemplars for metrics like `| rate()` when gRPC streaming.
4546
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)

cmd/tempo/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type App struct {
6767
readRings map[string]*ring.Ring
6868
partitionRing *ring.PartitionInstanceRing
6969
partitionRingWatcher *ring.PartitionRingWatcher
70+
generatorRingWatcher *ring.PartitionRingWatcher
7071
Overrides overrides.Service
7172
distributor *distributor.Distributor
7273
querier *querier.Querier

cmd/tempo/app/modules.go

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,17 @@ const (
6767
SecondaryIngesterRing string = "secondary-ring"
6868
MetricsGeneratorRing string = "metrics-generator-ring"
6969
PartitionRing string = "partition-ring"
70+
GeneratorRingWatcher string = "generator-ring-watcher"
7071

7172
// individual targets
72-
Distributor string = "distributor"
73-
Ingester string = "ingester"
74-
MetricsGenerator string = "metrics-generator"
75-
Querier string = "querier"
76-
QueryFrontend string = "query-frontend"
77-
Compactor string = "compactor"
78-
BlockBuilder string = "block-builder"
73+
Distributor string = "distributor"
74+
Ingester string = "ingester"
75+
MetricsGenerator string = "metrics-generator"
76+
MetricsGeneratorNoLocalBlocks string = "metrics-generator-no-local-blocks"
77+
Querier string = "querier"
78+
QueryFrontend string = "query-frontend"
79+
Compactor string = "compactor"
80+
BlockBuilder string = "block-builder"
7981

8082
// composite targets
8183
SingleBinary string = "all"
@@ -328,6 +330,59 @@ func (t *App) initGenerator() (services.Service, error) {
328330
return t.generator, nil
329331
}
330332

333+
func (t *App) initGeneratorNoLocalBlocks() (services.Service, error) {
334+
reg := prometheus.DefaultRegisterer
335+
336+
t.cfg.Generator.Ingest = t.cfg.Ingest
337+
338+
// In this mode, the generator runs as a stateless queue consumer that reads from
339+
// Kafka and remote writes to a Prometheus-compatible metrics store.
340+
if !t.cfg.Ingest.Enabled {
341+
return nil, errors.New("ingest storage must be enabled to run metrics generator in this mode")
342+
}
343+
// The localblocks processor is disabled in this mode.
344+
t.cfg.Generator.DisableLocalBlocks = true
345+
// The store is used only by the localblocks processor. We don't need it when
346+
// running with that processor disabled so we keep the default zero value.
347+
var store tempo_storage.Store
348+
// In this mode, the generator does not need to become available to serve
349+
// queries, so we can skip setting up a gRPC server.
350+
t.cfg.Generator.DisableGRPC = true
351+
352+
var err error
353+
t.generator, err = generator.New(&t.cfg.Generator, t.Overrides, reg, t.generatorRingWatcher, store, log.Logger)
354+
if err != nil {
355+
return nil, fmt.Errorf("failed to create metrics-generator: %w", err)
356+
}
357+
358+
return t.generator, nil
359+
}
360+
361+
func (t *App) initGeneratorRingWatcher() (services.Service, error) {
362+
reg := prometheus.DefaultRegisterer
363+
364+
kvRegisterer := kv.RegistererWithKVName(reg, t.cfg.Generator.OverrideRingKey+"-watcher")
365+
kvClient, err := kv.NewClient(t.cfg.Generator.Ring.KVStore, ring.GetPartitionRingCodec(), kvRegisterer, util_log.Logger)
366+
if err != nil {
367+
return nil, fmt.Errorf("creating KV store for generator partition ring watcher: %w", err)
368+
}
369+
370+
t.generatorRingWatcher = ring.NewPartitionRingWatcher(
371+
t.cfg.Generator.OverrideRingKey,
372+
t.cfg.Generator.OverrideRingKey,
373+
kvClient,
374+
util_log.Logger,
375+
prometheus.WrapRegistererWithPrefix("tempo_", reg),
376+
)
377+
378+
// Expose a web page to view the partition ring state.
379+
editor := ring.NewPartitionRingEditor(t.cfg.Generator.OverrideRingKey, kvClient)
380+
t.Server.HTTPRouter().Path("/partition/ring").Methods("GET", "POST").
381+
Handler(ring.NewPartitionRingPageHandler(t.generatorRingWatcher, editor))
382+
383+
return t.generatorRingWatcher, nil
384+
}
385+
331386
func (t *App) initBlockBuilder() (services.Service, error) {
332387
if !t.cfg.Ingest.Enabled {
333388
return services.NewIdleService(nil, nil), nil
@@ -638,6 +693,7 @@ func (t *App) setupModuleManager() error {
638693
mm.RegisterModule(CacheProvider, t.initCacheProvider, modules.UserInvisibleModule)
639694
mm.RegisterModule(IngesterRing, t.initIngesterRing, modules.UserInvisibleModule)
640695
mm.RegisterModule(MetricsGeneratorRing, t.initGeneratorRing, modules.UserInvisibleModule)
696+
mm.RegisterModule(GeneratorRingWatcher, t.initGeneratorRingWatcher, modules.UserInvisibleModule)
641697
mm.RegisterModule(SecondaryIngesterRing, t.initSecondaryIngesterRing, modules.UserInvisibleModule)
642698
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)
643699

@@ -649,6 +705,7 @@ func (t *App) setupModuleManager() error {
649705
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
650706
mm.RegisterModule(Compactor, t.initCompactor)
651707
mm.RegisterModule(MetricsGenerator, t.initGenerator)
708+
mm.RegisterModule(MetricsGeneratorNoLocalBlocks, t.initGeneratorNoLocalBlocks)
652709
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
653710

654711
mm.RegisterModule(SingleBinary, nil)
@@ -667,17 +724,19 @@ func (t *App) setupModuleManager() error {
667724
SecondaryIngesterRing: {Server, MemberlistKV},
668725
MetricsGeneratorRing: {Server, MemberlistKV},
669726
PartitionRing: {MemberlistKV, Server, IngesterRing},
727+
GeneratorRingWatcher: {MemberlistKV},
670728

671729
Common: {UsageReport, Server, Overrides},
672730

673731
// individual targets
674-
QueryFrontend: {Common, Store, OverridesAPI},
675-
Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing},
676-
Ingester: {Common, Store, MemberlistKV, PartitionRing},
677-
MetricsGenerator: {Common, OptionalStore, MemberlistKV, PartitionRing},
678-
Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
679-
Compactor: {Common, Store, MemberlistKV},
680-
BlockBuilder: {Common, Store, MemberlistKV, PartitionRing},
732+
QueryFrontend: {Common, Store, OverridesAPI},
733+
Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing},
734+
Ingester: {Common, Store, MemberlistKV, PartitionRing},
735+
MetricsGenerator: {Common, OptionalStore, MemberlistKV, PartitionRing},
736+
MetricsGeneratorNoLocalBlocks: {Common, GeneratorRingWatcher},
737+
Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
738+
Compactor: {Common, Store, MemberlistKV},
739+
BlockBuilder: {Common, Store, MemberlistKV, PartitionRing},
681740

682741
// composite targets
683742
SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator, BlockBuilder},

docs/sources/tempo/configuration/manifest.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,9 @@ metrics_generator:
655655
metrics_ingestion_time_range_slack: 30s
656656
query_timeout: 30s
657657
override_ring_key: metrics-generator
658+
codec: push-bytes
659+
disable_local_blocks: false
660+
disable_grpc: false
658661
ingest_concurrency: 16
659662
instance_id: hostname
660663
ingest:

modules/generator/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"os"
8+
"slices"
89
"time"
910

1011
"github.com/grafana/tempo/modules/generator/processor/localblocks"
@@ -25,8 +26,15 @@ const (
2526
ringNameForServer = "metrics-generator"
2627

2728
ConsumerGroup = "metrics-generator"
29+
30+
// codecPushBytes refers to the codec used for decoding tempopb.PushBytesRequest
31+
codecPushBytes = "push-bytes"
32+
// codecOTLP refers to the codec used for decoding ptrace.Traces
33+
codecOTLP = "otlp"
2834
)
2935

36+
var validCodecs = []string{codecPushBytes, codecOTLP}
37+
3038
// Config for a generator.
3139
type Config struct {
3240
Ring RingConfig `yaml:"ring"`
@@ -41,6 +49,14 @@ type Config struct {
4149
QueryTimeout time.Duration `yaml:"query_timeout"`
4250
OverrideRingKey string `yaml:"override_ring_key"`
4351

52+
// Codec controls which decoder to use for data consumed from Kafka.
53+
Codec string `yaml:"codec"`
54+
// DisableLocalBlocks controls whether the local blocks processor should be run.
55+
// When this flag is enabled, the processor is never instantiated.
56+
DisableLocalBlocks bool `yaml:"disable_local_blocks"`
57+
// DisableGRPC controls whether to run a gRPC server with the metrics generator endpoints.
58+
DisableGRPC bool `yaml:"disable_grpc"`
59+
4460
// This config is dynamically injected because defined outside the generator config.
4561
Ingest ingest.Config `yaml:"-"`
4662
IngestConcurrency uint `yaml:"ingest_concurrency"`
@@ -64,6 +80,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
6480
cfg.MetricsIngestionSlack = 30 * time.Second
6581
cfg.QueryTimeout = 30 * time.Second
6682
cfg.OverrideRingKey = generatorRingKey
83+
cfg.Codec = codecPushBytes
6784

6885
hostname, err := os.Hostname()
6986
if err != nil {
@@ -98,6 +115,10 @@ func (cfg *Config) Validate() error {
98115
}
99116
}
100117

118+
if !slices.Contains(validCodecs, cfg.Codec) {
119+
return fmt.Errorf("invalid codec: %s, valid choices are %s", cfg.Codec, validCodecs)
120+
}
121+
101122
return nil
102123
}
103124

modules/generator/generator.go

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,31 +107,33 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist
107107
logger: logger,
108108
}
109109

110-
// Lifecycler and ring
111-
ringStore, err := kv.NewClient(
112-
cfg.Ring.KVStore,
113-
ring.GetCodec(),
114-
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("tempo_", reg), "metrics-generator"),
115-
g.logger,
116-
)
117-
if err != nil {
118-
return nil, fmt.Errorf("create KV store client: %w", err)
119-
}
110+
if !cfg.DisableGRPC {
111+
// Lifecycler and ring
112+
ringStore, err := kv.NewClient(
113+
cfg.Ring.KVStore,
114+
ring.GetCodec(),
115+
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("tempo_", reg), "metrics-generator"),
116+
g.logger,
117+
)
118+
if err != nil {
119+
return nil, fmt.Errorf("create KV store client: %w", err)
120+
}
120121

121-
lifecyclerCfg, err := cfg.Ring.toLifecyclerConfig()
122-
if err != nil {
123-
return nil, fmt.Errorf("invalid ring lifecycler config: %w", err)
124-
}
122+
lifecyclerCfg, err := cfg.Ring.toLifecyclerConfig()
123+
if err != nil {
124+
return nil, fmt.Errorf("invalid ring lifecycler config: %w", err)
125+
}
125126

126-
// Define lifecycler delegates in reverse order (last to be called defined first because they're
127-
// chained via "next delegate").
128-
delegate := ring.BasicLifecyclerDelegate(g)
129-
delegate = ring.NewLeaveOnStoppingDelegate(delegate, g.logger)
130-
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, g.logger)
127+
// Define lifecycler delegates in reverse order (last to be called defined first because they're
128+
// chained via "next delegate").
129+
delegate := ring.BasicLifecyclerDelegate(g)
130+
delegate = ring.NewLeaveOnStoppingDelegate(delegate, g.logger)
131+
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, g.logger)
131132

132-
g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, cfg.OverrideRingKey, ringStore, delegate, g.logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
133-
if err != nil {
134-
return nil, fmt.Errorf("create ring lifecycler: %w", err)
133+
g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, cfg.OverrideRingKey, ringStore, delegate, g.logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
134+
if err != nil {
135+
return nil, fmt.Errorf("create ring lifecycler: %w", err)
136+
}
135137
}
136138

137139
g.Service = services.NewBasicService(g.starting, g.running, g.stopping)
@@ -152,16 +154,18 @@ func (g *Generator) starting(ctx context.Context) (err error) {
152154
}
153155
}()
154156

155-
g.subservices, err = services.NewManager(g.ringLifecycler)
156-
if err != nil {
157-
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
158-
}
159-
g.subservicesWatcher = services.NewFailureWatcher()
160-
g.subservicesWatcher.WatchManager(g.subservices)
157+
if !g.cfg.DisableGRPC {
158+
g.subservices, err = services.NewManager(g.ringLifecycler)
159+
if err != nil {
160+
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
161+
}
162+
g.subservicesWatcher = services.NewFailureWatcher()
163+
g.subservicesWatcher.WatchManager(g.subservices)
161164

162-
err = services.StartManagerAndAwaitHealthy(ctx, g.subservices)
163-
if err != nil {
164-
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
165+
err = services.StartManagerAndAwaitHealthy(ctx, g.subservices)
166+
if err != nil {
167+
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
168+
}
165169
}
166170

167171
if g.cfg.Ingest.Enabled {
@@ -370,6 +374,12 @@ func (g *Generator) createInstance(id string) (*instance, error) {
370374
}
371375

372376
func (g *Generator) CheckReady(_ context.Context) error {
377+
// Always mark as ready when running without a ring, because the readiness logic
378+
// below depends on the ring lifecycler.
379+
if g.ringLifecycler == nil {
380+
return nil
381+
}
382+
373383
if !g.ringLifecycler.IsRegistered() {
374384
return fmt.Errorf("metrics-generator check ready failed: not registered in the ring")
375385
}

modules/generator/generator_kafka.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/go-kit/log/level"
11+
1112
"github.com/grafana/tempo/pkg/ingest"
1213
"github.com/grafana/tempo/pkg/tempopb"
1314
"github.com/prometheus/client_golang/prometheus"
@@ -107,7 +108,14 @@ func (g *Generator) readKafka(ctx context.Context) error {
107108
// to multiple goroutines.
108109
func (g *Generator) readCh(ctx context.Context) {
109110
defer g.kafkaWG.Done()
110-
d := ingest.NewDecoder()
111+
112+
var c ingest.GeneratorCodec
113+
switch g.cfg.Codec {
114+
case codecPushBytes:
115+
c = ingest.NewPushBytesDecoder()
116+
case codecOTLP:
117+
c = ingest.NewOTLPDecoder()
118+
}
111119

112120
for {
113121
var r *kgo.Record
@@ -125,26 +133,21 @@ func (g *Generator) readCh(ctx context.Context) {
125133
continue
126134
}
127135

128-
d.Reset()
129-
req, err := d.Decode(r.Value)
136+
iterator, err := c.Decode(r.Value)
130137
if err != nil {
131138
level.Error(g.logger).Log("msg", "consumeKafkaChannel decode", "err", err)
132139
continue
133140
}
134141

135-
for _, tr := range req.Traces {
136-
trace := &tempopb.Trace{}
137-
err = trace.Unmarshal(tr.Slice)
142+
for resourceSpans, err := range iterator {
138143
if err != nil {
139144
level.Error(g.logger).Log("msg", "consumeKafkaChannel unmarshal", "err", err)
140145
continue
141146
}
142147

143148
i.pushSpansFromQueue(ctx, r.Timestamp, &tempopb.PushSpansRequest{
144-
Batches: trace.ResourceSpans,
149+
Batches: resourceSpans,
145150
})
146-
147-
tempopb.ReuseByteSlices([][]byte{tr.Slice})
148151
}
149152
}
150153
}

modules/generator/instance.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (i *instance) updateSubprocessors(desiredProcessors map[string]struct{}, de
201201
}
202202

203203
func (i *instance) updateProcessors() error {
204-
desiredProcessors := i.overrides.MetricsGeneratorProcessors(i.instanceID)
204+
desiredProcessors := i.filterDisabledProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
205205
desiredCfg, err := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID)
206206
if err != nil {
207207
return err
@@ -253,6 +253,21 @@ func (i *instance) updateProcessors() error {
253253
return nil
254254
}
255255

256+
// filterDisabledProcessors removes processors that should never be instantiated
257+
// according to the generator's configuration from the given set of processors.
258+
func (i *instance) filterDisabledProcessors(processors map[string]struct{}) map[string]struct{} {
259+
// If no processors are disabled, do not apply any filtering.
260+
if !i.cfg.DisableLocalBlocks {
261+
return processors
262+
}
263+
264+
// Otherwise, do not instantiate the localblocks processor.
265+
filteredProcessors := maps.Clone(processors)
266+
delete(filteredProcessors, localblocks.Name)
267+
268+
return filteredProcessors
269+
}
270+
256271
// diffProcessors compares the existing processors with the desired processors and config.
257272
// Must be called under a read lock.
258273
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}, desiredCfg ProcessorConfig) (toAdd, toRemove, toReplace []string, err error) {

0 commit comments

Comments
 (0)