Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
* [ENHANCEMENT] Increase query-frontend default batch size [#4844](https://github.com/grafana/tempo/pull/4844) (@javiermolinar)
* [ENHANCEMENT] Improve TraceQL perf by reverting EqualRowNumber to an inlineable function.[#4705](https://github.com/grafana/tempo/pull/4705) (@joe-elliott)
* [ENHANCEMENT] Rhythm: fair partition consumption in blockbuilders [#4655](https://github.com/grafana/tempo/pull/4655) (@javiermolinar)
* [ENHANCEMENT] Rhythm: retry on commit error [#4874](https://github.com/grafana/tempo/pull/4874) (@javiermolinar)
* [ENHANCEMENT] Skip creating one span-traces for every pushed spans in metrics generator [#4844](https://github.com/grafana/tempo/pull/4844) (@javiermolinar)
* [ENHANCEMENT] TraceQL: add support for querying by parent span id [#4692](https://github.com/grafana/tempo/pull/4692) (@ie-pham)
* [ENHANCEMENT] metrics-generator: allow skipping localblocks and consuming from a different source of data [#4686](https://github.com/grafana/tempo/pull/4686) (@flxbk)
Expand Down
36 changes: 30 additions & 6 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,11 @@ outer:
}

offset := kadm.NewOffsetFromRecord(lastRec)
offsets := make(kadm.Offsets)
offsets.Add(offset)
resp, err := b.kadm.CommitOffsets(ctx, group, offsets) // TODO - Retry commit
err = b.commitOffset(ctx, offset, group, ps.partition)
if err != nil {
return time.Time{}, -1, err
}
if err := resp.Error(); err != nil {
return time.Time{}, -1, err
}

level.Info(b.logger).Log(
"msg", "successfully committed offset to kafka",
"partition", ps.partition,
Expand All @@ -425,6 +421,34 @@ outer:
return lastRec.Timestamp, offset.At, nil
}

func (b *BlockBuilder) commitOffset(ctx context.Context, offset kadm.Offset, group string, partition int32) error {
offsets := make(kadm.Offsets)
offsets.Add(offset)

boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Minute,
MaxRetries: 10,
})
for boff.Ongoing() {
err := b.kadm.CommitAllOffsets(ctx, group, offsets)
if err == nil {
break
}
level.Warn(b.logger).Log(
"msg", "failed to commit offset, retrying",
"err", err,
"partition", partition,
"commit_offset", offset.At,
)
boff.Wait()
}
if err := boff.ErrCause(); err != nil {
return fmt.Errorf("error committing offset %d for partition %d, it won't be retried: %w", offset.At, partition, err)
}
return nil
}

func formatActivePartitions(partitions []int32) string {
var strArr []string
for _, v := range partitions {
Expand Down
118 changes: 118 additions & 0 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -354,6 +355,123 @@ func TestBlockbuilder_committingFails(t *testing.T) {
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

func TestBlockbuilder_retries_on_retriable_commit_error(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateCluster(t, 1, "test-topic")

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(req kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()

if kafkaCommits.Load() == 1 {
res := kmsg.NewOffsetCommitResponse()
res.Version = req.GetVersion()
res.Topics = []kmsg.OffsetCommitResponseTopic{
{
Topic: testTopic,
Partitions: []kmsg.OffsetCommitResponseTopicPartition{
{
Partition: 0,
ErrorCode: kerr.NotEnoughReplicas.Code, // Retryable error code
},
},
},
}
return &res, nil, true
}

return nil, nil, false
})

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)

client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset

b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
})

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() == 2
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1 // Only one block should have been written
}, time.Minute, time.Second)

requireLastCommitEquals(t, ctx, client, lastRecordOffset+1)
}

func TestBlockbuilder_retries_on_commit_error(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateCluster(t, 1, "test-topic")

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(req kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()

if kafkaCommits.Load() == 1 {
res := kmsg.NewOffsetCommitResponse()
res.Version = req.GetVersion()
res.Topics = []kmsg.OffsetCommitResponseTopic{
{
Topic: testTopic,
Partitions: []kmsg.OffsetCommitResponseTopicPartition{
{
Partition: 0,
},
},
},
}
return &res, fmt.Errorf("error committing offset"), true
}

return nil, nil, false
})

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)

client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset

b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
})

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() == 2
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1 // Only one block should have been written
}, time.Minute, time.Second)

requireLastCommitEquals(t, ctx, client, lastRecordOffset+1)
}

// TestBlockbuilder_noDoubleConsumption verifies that records are not consumed twice when there are no more records in the partition.
// This test ensures that the BlockBuilder correctly commits the offset as lastRec.Offset + 1 instead of just lastRec.Offset.
func TestBlockbuilder_noDoubleConsumption(t *testing.T) {
Expand Down