Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] TraceQL metrics performance increase for simple queries [#5247](https://github.com/grafana/tempo/pull/5247) (@mdisibio)
* [ENHANCEMENT] TraceQL search and metrics performance increase [#5280](https://github.com/grafana/tempo/pull/5280) (@mdisibio)
* [ENHANCEMENT] Traceql performance improvement [#5218](https://github.com/grafana/tempo/pull/5218) (@mdisibio)
* [ENHANCEMENT] Implement a listOffset by partition client [#5415](https://github.com/grafana/tempo/pull/5415) (@javiermolinar)
* [ENHANCEMENT] Align traceql attribute struct for better performance [#5240](https://github.com/grafana/tempo/pull/5240) (@mdisibio)
* [ENHANCEMENT] Enable HTTP writes in the multi-tenant example [#5297](https://github.com/grafana/tempo/pull/5297)
* [ENHANCEMENT] Drop invalid prometheus label names in spanmetrics processor [#5122](https://github.com/grafana/tempo/pull/5122) (@KyriosGN0)
Expand Down
22 changes: 13 additions & 9 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ type BlockBuilder struct {
logger log.Logger
cfg Config

kafkaClient *kgo.Client
kadm *kadm.Client
decoder *ingest.Decoder
partitionRing ring.PartitionRingReader
kafkaClient *kgo.Client
partitionOffsetClient *ingest.PartitionOffsetClient
kadm *kadm.Client
decoder *ingest.Decoder
partitionRing ring.PartitionRingReader

overrides Overrides
enc encoding.VersionedEncoding
Expand Down Expand Up @@ -158,7 +159,7 @@ func New(

func (b *BlockBuilder) starting(ctx context.Context) (err error) {
level.Info(b.logger).Log("msg", "block builder starting")

topic := b.cfg.IngestStorageConfig.Kafka.Topic
b.enc = encoding.DefaultEncoding()
if version := b.cfg.BlockConfig.BlockCfg.Version; version != "" {
b.enc, err = encoding.FromVersion(version)
Expand All @@ -181,6 +182,8 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
return fmt.Errorf("failed to create kafka reader client: %w", err)
}

b.partitionOffsetClient = ingest.NewPartitionOffsetClient(b.kafkaClient, topic)

boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Minute, // If there is a network hiccup, we prefer to wait longer retrying, than fail the service.
Expand All @@ -203,7 +206,7 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {

ingest.ExportPartitionLagMetrics(
ctx,
b.kadm,
b.kafkaClient,
b.logger,
b.cfg.IngestStorageConfig,
b.getAssignedPartitions,
Expand Down Expand Up @@ -238,6 +241,7 @@ func (b *BlockBuilder) running(ctx context.Context) error {
// all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp
func (b *BlockBuilder) consume(ctx context.Context) (time.Duration, error) {
partitions := b.getAssignedPartitions()

ctx, span := tracer.Start(ctx, "blockbuilder.consume", trace.WithAttributes(attribute.String("active_partitions", formatActivePartitions(partitions))))
defer span.End()

Expand Down Expand Up @@ -521,7 +525,7 @@ func (b *BlockBuilder) fetchPartitions(ctx context.Context, partitions []int32)
MaxRetries: 5,
})
for boff.Ongoing() {
commits, endsOffsets, err = b.getPartitionOffsets(ctx)
commits, endsOffsets, err = b.getPartitionOffsets(ctx, partitions)
if err == nil {
break
}
Expand All @@ -541,7 +545,7 @@ func (b *BlockBuilder) fetchPartitions(ctx context.Context, partitions []int32)

// todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder.
// improve it to only fetch the offsets for the assigned partitions
func (b *BlockBuilder) getPartitionOffsets(ctx context.Context) (kadm.OffsetResponses, kadm.ListedOffsets, error) {
func (b *BlockBuilder) getPartitionOffsets(ctx context.Context, partitionIDs []int32) (kadm.OffsetResponses, kadm.ListedOffsets, error) {
var (
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
Expand All @@ -554,7 +558,7 @@ func (b *BlockBuilder) getPartitionOffsets(ctx context.Context) (kadm.OffsetResp
return nil, nil, err
}

endsOffsets, err := b.kadm.ListEndOffsets(ctx, topic)
endsOffsets, err := b.partitionOffsetClient.FetchPartitionsLastProducedOffsets(ctx, partitionIDs)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateCluster(t, 1, testTopic)
k, address := testkafka.CreateCluster(t, 100, testTopic)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
Expand Down
2 changes: 2 additions & 0 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Generator struct {
kafkaStop func()
kafkaClient *ingest.Client
kafkaAdm *kadm.Client
partitionClient *ingest.PartitionOffsetClient
partitionRing ring.PartitionRingReader
partitionMtx sync.RWMutex
assignedPartitions []int32
Expand Down Expand Up @@ -213,6 +214,7 @@ func (g *Generator) starting(ctx context.Context) (err error) {
}

g.kafkaAdm = kadm.NewClient(g.kafkaClient.Client)
g.partitionClient = ingest.NewPartitionOffsetClient(g.kafkaClient.Client, g.cfg.Ingest.Kafka.Topic)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var metricEnqueueTime = promauto.NewCounter(prometheus.CounterOpts{

func (g *Generator) startKafka() {
g.kafkaCh = make(chan *kgo.Record, g.cfg.IngestConcurrency)

// Create context that will be used to stop the goroutines.
var ctx context.Context
ctx, g.kafkaStop = context.WithCancel(context.Background())
Expand All @@ -36,7 +35,8 @@ func (g *Generator) startKafka() {

g.kafkaWG.Add(1)
go g.listenKafka(ctx)
ingest.ExportPartitionLagMetrics(ctx, g.kafkaAdm, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions, g.kafkaClient.ForceMetadataRefresh)

ingest.ExportPartitionLagMetrics(ctx, g.kafkaClient.Client, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions, g.kafkaClient.ForceMetadataRefresh)
}

func (g *Generator) stopKafka() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -84,6 +85,10 @@ type KafkaConfig struct {

TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`

// The fetch backoff config to use in the concurrent fetchers (when enabled). This setting
// is just used to change the default backoff in tests.
concurrentFetchersFetchBackoffConfig backoff.Config `yaml:"-"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/ingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

const (
Expand Down Expand Up @@ -43,7 +44,7 @@ var (
// Call ResetLagMetricsForRevokedPartitions when partitions are revoked to prevent exporting
// stale data. For efficiency this is not detected automatically from changes inthe assigned
// partition callback.
func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func()) {
func ExportPartitionLagMetrics(ctx context.Context, kclient *kgo.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func()) {
go func() {
var (
waitTime = time.Second * 15
Expand All @@ -54,6 +55,8 @@ func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log
MaxBackoff: waitTime,
MaxRetries: 5,
})
admClient = kadm.NewClient(kclient)
partitionClient = NewPartitionOffsetClient(kclient, topic)
)

for {
Expand All @@ -63,9 +66,10 @@ func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log
lag kadm.GroupLag
err error
)
assignedPartitions := getAssignedActivePartitions()
boff.Reset()
for boff.Ongoing() {
lag, err = getGroupLag(ctx, admClient, topic, group)
lag, err = getGroupLag(ctx, admClient, partitionClient, group, assignedPartitions)
if err == nil {
break
}
Expand All @@ -77,7 +81,7 @@ func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log
level.Error(log).Log("msg", "metric lag failed:", "err", err, "retries", boff.NumRetries())
continue
}
for _, p := range getAssignedActivePartitions() {
for _, p := range assignedPartitions {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag))
Expand Down Expand Up @@ -115,7 +119,7 @@ func ResetLagMetricsForRevokedPartitions(group string, partitions []int32) {
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
func getGroupLag(ctx context.Context, admClient *kadm.Client, partitionClient *PartitionOffsetClient, group string, assignedPartitions []int32) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
Expand All @@ -126,11 +130,11 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
startOffsets, err := partitionClient.FetchPartitionsStartProducedOffsets(ctx, assignedPartitions)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
endOffsets, err := partitionClient.FetchPartitionsLastProducedOffsets(ctx, assignedPartitions)
if err != nil {
return nil, err
}
Expand Down
119 changes: 119 additions & 0 deletions pkg/ingest/partition_offset_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"context"
"fmt"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)

const (
// kafkaOffsetStart is a special offset value that means the beginning of the partition.
kafkaOffsetStart = int64(-2)

// kafkaOffsetEnd is a special offset value that means the end of the partition.
kafkaOffsetEnd = int64(-1)
)

// PartitionOffsetClient is a client used to read partition offsets.
type PartitionOffsetClient struct {
client *kgo.Client
topic string
}

func NewPartitionOffsetClient(client *kgo.Client, topic string) *PartitionOffsetClient {
return &PartitionOffsetClient{
client: client,
topic: topic,
}
}

// FetchPartitionsLastProducedOffsets fetches and returns the last produced offsets for all input partitions. The returned offsets for each partition
// are guaranteed to be always updated (no stale or cached offsets returned).
// The Kafka client used under the hood may retry a failed request until the retry timeout is hit.
func (p *PartitionOffsetClient) FetchPartitionsLastProducedOffsets(ctx context.Context, partitionIDs []int32) (_ kadm.ListedOffsets, returnErr error) {
// Skip lookup and don't track any metric if no partition was requested.
if len(partitionIDs) == 0 {
return nil, nil
}
return p.fetchPartitionsOffsets(ctx, kafkaOffsetEnd, partitionIDs)
}

// // FetchPartitionsStartOffsets fetches and returns the earliest available offsets for all input partitions. The returned offsets for each partition
// are guaranteed to be always updated (no stale or cached offsets returned).
// The Kafka client used under the hood may retry a failed request until the retry timeout is hit.
func (p *PartitionOffsetClient) FetchPartitionsStartProducedOffsets(ctx context.Context, partitionIDs []int32) (_ kadm.ListedOffsets, returnErr error) {
// Skip lookup and don't track any metric if no partition was requested.
if len(partitionIDs) == 0 {
return nil, nil
}

return p.fetchPartitionsOffsets(ctx, kafkaOffsetStart, partitionIDs)
}

// fetchPartitionsOffsets fetches and returns offsets for the specified partitions using Kafka's ListOffsets API.
// The fetchOffset parameter determines which offsets to retrieve:
// - kafkaOffsetStart (-2): earliest available offset in each partition
// - kafkaOffsetEnd (-1): next offset to be produced (high watermark) in each partition
// - specific timestamp: offset of the first message at or after the given timestamp
//
// This function returns an error if it fails to get the offset of any partition (no partial results are returned).
// The Kafka ListOffsets API is documented here: https://github.com/twmb/franz-go/blob/master/pkg/kmsg/generated.go#L5781-L5808
func (p *PartitionOffsetClient) fetchPartitionsOffsets(ctx context.Context, fetchOffset int64, partitionIDs []int32) (kadm.ListedOffsets, error) {
list := kadm.ListedOffsets{
p.topic: make(map[int32]kadm.ListedOffset, len(partitionIDs)),
}

// Prepare the request to list offsets.
topicReq := kmsg.NewListOffsetsRequestTopic()
topicReq.Topic = p.topic
for _, partitionID := range partitionIDs {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = partitionID
partitionReq.Timestamp = fetchOffset

topicReq.Partitions = append(topicReq.Partitions, partitionReq)
}

req := kmsg.NewPtrListOffsetsRequest()
req.IsolationLevel = 0 // 0 means READ_UNCOMMITTED.
req.Topics = []kmsg.ListOffsetsRequestTopic{topicReq}

// Execute the request.
shards := p.client.RequestSharded(ctx, req)

for _, shard := range shards {
if shard.Err != nil {
return nil, shard.Err
}

res := shard.Resp.(*kmsg.ListOffsetsResponse)
if len(res.Topics) != 1 {
return nil, fmt.Errorf("unexpected number of topics in the response (expected: %d, got: %d)", 1, len(res.Topics))
}
if res.Topics[0].Topic != p.topic {
return nil, fmt.Errorf("unexpected topic in the response (expected: %s, got: %s)", p.topic, res.Topics[0].Topic)
}

for _, pt := range res.Topics[0].Partitions {
if err := kerr.ErrorForCode(pt.ErrorCode); err != nil {
return nil, err
}

list[p.topic][pt.Partition] = kadm.ListedOffset{
Topic: p.topic,
Partition: pt.Partition,
Timestamp: pt.Timestamp,
Offset: pt.Offset,
LeaderEpoch: pt.LeaderEpoch,
}
}
}

return list, nil
}
Loading
Loading