Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a524eb8
Add new concurrency config options and validate
mdisibio Feb 13, 2025
96e7b1e
Reduce allocations of IDmap when replaying wal blocks since we know t…
mdisibio Feb 13, 2025
88c7273
Generator read from kafka concurrency, add shared ingest lag metric, …
mdisibio Feb 13, 2025
0dfcd36
Memoize spanmetrics sanitizelabelname, move to better location
mdisibio Feb 13, 2025
7db90df
Moved to shared queue for localblocks wal completion, allow concurren…
mdisibio Feb 13, 2025
b53488c
Honor max live traces in non-flushing local blocks processor
mdisibio Feb 14, 2025
15f2bd0
Add metric for enqueue time
mdisibio Feb 14, 2025
8169501
Add missing mutex lock for enqueuing on replay
mdisibio Feb 14, 2025
921d471
Fix mutex lock while reloading blocks
mdisibio Feb 14, 2025
531ad75
Increase default concurrency
mdisibio Feb 14, 2025
3c8d3da
Simplify local blocks complete queue
mdisibio Feb 14, 2025
e3f886c
Remove uniqify, fix test
mdisibio Feb 14, 2025
820fcbc
Switch completequeue to reference counting and shut it down when last…
mdisibio Feb 14, 2025
e16565d
cache rename/cleanup
mdisibio Feb 18, 2025
67b97ba
Cleanup/denoising PR
mdisibio Feb 18, 2025
d00df8a
Lint/cleanup
mdisibio Feb 18, 2025
62a180f
lint
mdisibio Feb 18, 2025
cae66d1
Update config manifest
mdisibio Feb 18, 2025
e67f59b
fix race condition
mdisibio Feb 18, 2025
0cc26b9
cleanup
mdisibio Feb 19, 2025
0f45e6c
review feedback
mdisibio Feb 20, 2025
cabf9cb
Update config manifest
mdisibio Feb 20, 2025
bc4be1b
changelog
mdisibio Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve metrics-generator performance and stability by applying queue back pressure and concurrency [#4721](https://github.com/grafana/tempo/pull/4721) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance [#4596](https://github.com/grafana/tempo/pull/4596) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by not using WAL stage [#4647](https://github.com/grafana/tempo/pull/4647) [#4671](https://github.com/grafana/tempo/pull/4671) (@mdisibio)
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,10 @@ metrics_generator:
trace_idle_period: 10s
max_block_duration: 1m0s
max_block_bytes: 500000000
concurrency: 4
complete_block_timeout: 1h0m0s
max_live_traces: 0
max_live_traces_bytes: 250000000
filter_server_spans: true
flush_to_storage: false
concurrent_blocks: 10
Expand Down Expand Up @@ -653,6 +655,7 @@ metrics_generator:
metrics_ingestion_time_range_slack: 30s
query_timeout: 30s
override_ring_key: metrics-generator
ingest_concurrency: 16
instance_id: hostname
ingest:
enabled: false
Expand Down
26 changes: 20 additions & 6 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package generator

import (
"errors"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -41,8 +42,9 @@ type Config struct {
OverrideRingKey string `yaml:"override_ring_key"`

// This config is dynamically injected because defined outside the generator config.
Ingest ingest.Config `yaml:"-"`
InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
Ingest ingest.Config `yaml:"-"`
IngestConcurrency uint `yaml:"ingest_concurrency"`
InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -55,6 +57,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.TracesWAL.Version = encoding.DefaultEncoding().Version()
cfg.TracesQueryWAL.RegisterFlags(f)
cfg.TracesQueryWAL.Version = encoding.DefaultEncoding().Version()
cfg.Ingest.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.IngestConcurrency = 16

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
Expand All @@ -70,10 +74,16 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
}

func (cfg *Config) Validate() error {
if cfg.Ingest.Enabled {
if err := cfg.Ingest.Kafka.Validate(); err != nil {
return err
}
if err := cfg.Ingest.Validate(); err != nil {
return err
}

if cfg.IngestConcurrency == 0 {
return errors.New("ingest concurrency must be greater than zero")
}

if err := cfg.Processor.Validate(); err != nil {
return err
}

// Only validate if being used
Expand Down Expand Up @@ -103,6 +113,10 @@ func (cfg *ProcessorConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag
cfg.LocalBlocks.RegisterFlagsAndApplyDefaults(prefix, f)
}

func (cfg *ProcessorConfig) Validate() error {
return cfg.LocalBlocks.Validate()
}

// copyWithOverrides creates a copy of the config using values set in the overrides.
func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userID string) (ProcessorConfig, error) {
copyCfg := *cfg
Expand Down
1 change: 1 addition & 0 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Generator struct {
reg prometheus.Registerer
logger log.Logger

kafkaCh chan *kgo.Record
kafkaWG sync.WaitGroup
kafkaStop func()
kafkaClient *ingest.Client
Expand Down
77 changes: 68 additions & 9 deletions modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,35 @@ import (
"errors"
"sort"
"strconv"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
)

var metricEnqueueTime = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "metrics_generator",
Name: "enqueue_time_seconds_total",
Help: "The total amount of time spent waiting to enqueue for processing",
})

func (g *Generator) startKafka() {
g.kafkaCh = make(chan *kgo.Record, g.cfg.IngestConcurrency)

// Create context that will be used to stop the goroutines.
var ctx context.Context
ctx, g.kafkaStop = context.WithCancel(context.Background())

for i := uint(0); i < g.cfg.IngestConcurrency; i++ {
g.kafkaWG.Add(1)
go g.readCh(ctx)
}

g.kafkaWG.Add(1)
go g.listenKafka(ctx)
ingest.ExportPartitionLagMetrics(ctx, g.kafkaAdm, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions)
Expand All @@ -24,6 +42,7 @@ func (g *Generator) startKafka() {
func (g *Generator) stopKafka() {
g.kafkaStop()
g.kafkaWG.Wait()
close(g.kafkaCh)
}

func (g *Generator) listenKafka(ctx context.Context) {
Expand All @@ -49,45 +68,85 @@ func (g *Generator) listenKafka(ctx context.Context) {
}

func (g *Generator) readKafka(ctx context.Context) error {
d := ingest.NewDecoder()

fetches := g.kafkaClient.PollFetches(ctx)
fetches.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, context.Canceled) {
level.Error(g.logger).Log("msg", "failed to fetch records", "err", err)
}
})
if err := fetches.Err(); err != nil && !errors.Is(err, context.Canceled) {
return err
}

// Metric lag based on first message in each partition.
// This balances overhead with granularity.
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
if len(p.Records) > 0 {
lag := time.Since(p.Records[0].Timestamp)
ingest.SetPartitionLagSeconds(g.cfg.Ingest.Kafka.ConsumerGroup, int(p.Partition), lag)
}
})

start := time.Now()

for iter := fetches.RecordIter(); !iter.Done(); {
r := iter.Next()
select {
case g.kafkaCh <- iter.Next():
case <-ctx.Done():
return ctx.Err()
}
}

metricEnqueueTime.Add(time.Since(start).Seconds())
Comment thread
javiermolinar marked this conversation as resolved.

return nil
}

// readCh reads records from the internal channel.
// This allows for offloading the expensive proto unmarshal
// to multiple goroutines.
func (g *Generator) readCh(ctx context.Context) {
defer g.kafkaWG.Done()
d := ingest.NewDecoder()

for {
var r *kgo.Record
select {
case r = <-g.kafkaCh:
case <-ctx.Done():
return
}

tenant := string(r.Key)

i, err := g.getOrCreateInstance(tenant)
if err != nil {
return err
level.Error(g.logger).Log("msg", "consumeKafkaChannel getOrCreateInstance", "err", err)
continue
}

d.Reset()
req, err := d.Decode(r.Value)
if err != nil {
return err
level.Error(g.logger).Log("msg", "consumeKafkaChannel decode", "err", err)
continue
}

for _, tr := range req.Traces {
trace := &tempopb.Trace{}
err = trace.Unmarshal(tr.Slice)
if err != nil {
return err
level.Error(g.logger).Log("msg", "consumeKafkaChannel unmarshal", "err", err)
continue
}

i.pushSpansFromQueue(ctx, &tempopb.PushSpansRequest{
i.pushSpansFromQueue(ctx, r.Timestamp, &tempopb.PushSpansRequest{
Comment thread
mapno marked this conversation as resolved.
Batches: trace.ResourceSpans,
})

tempopb.ReuseByteSlices([][]byte{tr.Slice})
}
}

return nil
}

func (g *Generator) getAssignedActivePartitions() []int32 {
Expand Down
5 changes: 3 additions & 2 deletions modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ overrides:
require.NoError(t, services.StartAndAwaitRunning(context.Background(), o))

generatorConfig := &Config{}
generatorConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
generatorConfig.Storage.Path = t.TempDir()
generatorConfig.Ring.KVStore.Store = "inmemory"
generatorConfig.Processor.SpanMetrics.RegisterFlagsAndApplyDefaults("", nil)
g, err := New(generatorConfig, o, prometheus.NewRegistry(), nil, nil, newTestLogger(t))
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), g))
Expand Down Expand Up @@ -155,7 +155,7 @@ func BenchmarkPushSpans(b *testing.B) {
"span-metrics": {},
"service-graphs": {},
},
spanMetricsEnableTargetInfo: false,
spanMetricsEnableTargetInfo: true,
spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"},
}
)
Expand Down Expand Up @@ -211,6 +211,7 @@ func BenchmarkPushSpans(b *testing.B) {
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
b.ReportMetric(float64(mem.HeapInuse), "heap_in_use")
b.ReportMetric(float64(mem.HeapAlloc), "heap_alloc")
}

func BenchmarkCollect(b *testing.B) {
Expand Down
6 changes: 4 additions & 2 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
if i.traceQueryWAL != nil {
nonFlushingConfig := cfg.LocalBlocks
nonFlushingConfig.FlushToStorage = false
nonFlushingConfig.AssertMaxLiveTraces = true
nonFlushingConfig.AdjustTimeRangeForSlack = false
i.queuebasedLocalBlocks, err = localblocks.New(nonFlushingConfig, i.instanceID, i.traceQueryWAL, i.writer, i.overrides)
if err != nil {
return err
Expand Down Expand Up @@ -377,7 +379,7 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest)
}
}

func (i *instance) pushSpansFromQueue(ctx context.Context, req *tempopb.PushSpansRequest) {
func (i *instance) pushSpansFromQueue(ctx context.Context, ts time.Time, req *tempopb.PushSpansRequest) {
i.preprocessSpans(req)
i.processorsMtx.RLock()
defer i.processorsMtx.RUnlock()
Expand All @@ -392,7 +394,7 @@ func (i *instance) pushSpansFromQueue(ctx context.Context, req *tempopb.PushSpan

// Now we push to the non-flushing local blocks if present
if i.queuebasedLocalBlocks != nil {
i.queuebasedLocalBlocks.PushSpans(ctx, req)
i.queuebasedLocalBlocks.DeterministicPush(ts, req)
}
}

Expand Down
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorServiceGraphsEnableVirtualNodeLabel(userID string) bool
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxLocalTracesPerUser(userID string) int
MaxBytesPerTrace(userID string) int
UnsafeQueryHints(userID string) bool
}
Expand Down
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type mockOverrides struct {
localBlocksTraceIdlePeriod time.Duration
localBlocksCompleteBlockTimeout time.Duration
dedicatedColumns backend.DedicatedColumns
maxLocalTraces int
maxBytesPerTrace int
unsafeQueryHints bool
nativeHistograms overrides.HistogramMethod
Expand Down Expand Up @@ -152,6 +153,10 @@ func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return m.dedicatedColumns
}

func (m *mockOverrides) MaxLocalTracesPerUser(string) int {
return m.maxLocalTraces
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return m.maxBytesPerTrace
}
Expand Down
Loading