Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 144 additions & 71 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -175,55 +176,87 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
return nil
}

type PartitionStatus struct {
partition int32
hasRecords bool
startOffset, endOffset int64
}

func (p PartitionStatus) lag() int64 {
return p.endOffset - p.startOffset
}

func (p PartitionStatus) getStartOffset() kgo.Offset {
if p.startOffset >= 0 {
return kgo.NewOffset().At(p.startOffset)
}
return kgo.NewOffset().AtStart()
}

func (b *BlockBuilder) running(ctx context.Context) error {
// Initial delay
waitTime := 0 * time.Second
consumeCycleBackoffFactor := 0.8
Comment thread
javiermolinar marked this conversation as resolved.
Outdated

for {
select {
case <-time.After(waitTime):
err := b.consume(ctx)
if err != nil {
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
}
startTime := time.Now()
more, err := b.consume(ctx)
if err != nil {
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
}
elapsed := time.Since(startTime)
waitTime := b.cfg.ConsumeCycleDuration

// Real delay on subsequent
waitTime = b.cfg.ConsumeCycleDuration
case <-ctx.Done():
return nil
if more {
waitTime = time.Duration(float64(waitTime) * consumeCycleBackoffFactor)
}

remainingWait := waitTime - elapsed
if remainingWait > 0 {
level.Info(b.logger).Log("msg", "cycle completed", "elapsed", elapsed, "remaining_wait", waitTime, "more_records", more)
select {
case <-time.After(remainingWait):
case <-ctx.Done():
return nil
}
}
}
}

func (b *BlockBuilder) consume(ctx context.Context) error {
// It consumes a single cycle per partition, priorizing the ones with more lag
func (b *BlockBuilder) consume(ctx context.Context) (bool, error) {
var (
end = time.Now()
Comment thread
javiermolinar marked this conversation as resolved.
Outdated
partitions = b.getAssignedActivePartitions()

more bool
)
level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", getActivePartitions(partitions))
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

// Clear all previous remnants
err := b.wal.Clear()
if err != nil {
return err
return false, err
}

for _, partition := range partitions {
// Consume partition while data remains.
// TODO - round-robin one consumption per partition instead to equalize catch-up time.
for {
more, err := b.consumePartition(ctx, partition, end)
if err != nil {
return err
}

if !more {
break
}
}
sortedPartitions, err := b.getSortedPartitions(ctx, partitions)
if err != nil {
return false, err
}
// Partitions with more lag are priorized
for _, partition := range sortedPartitions {
if !partition.hasRecords { // No records, we can skip the partition
continue
}
moreRecords, err := b.consumePartition(ctx, partition)
if err != nil {
return false, err
}
if moreRecords {
more = true
Comment thread
javiermolinar marked this conversation as resolved.
Outdated
}

return nil
}
return more, nil
}

func getActivePartitions(partitions []int32) string {
Expand All @@ -234,52 +267,91 @@ func getActivePartitions(partitions []int32) string {
return strings.Join(strArr, ",")
}

func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, overallEnd time.Time) (more bool, err error) {
defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds())
}(time.Now())

var (
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
partLabel = strconv.Itoa(int(partition))
startOffset kgo.Offset
init bool
writer *writer
lastRec *kgo.Record
end time.Time
)
// It fetches all the offsets for the blockbuilder topic, for each owned partitions it calculates their last committed records and the
// end record offset. Based on that it sort the partitions by lag
func (b *BlockBuilder) getSortedPartitions(ctx context.Context, partitions []int32) ([]PartitionStatus, error) {
ps := make([]PartitionStatus, len(partitions))
group := b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
topic := b.cfg.IngestStorageConfig.Kafka.Topic

commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic)
if err != nil {
return false, err
return nil, err
}
if err := commits.Error(); err != nil {
return false, err
return nil, err
}

lastCommit, ok := commits.Lookup(topic, partition)
if ok && lastCommit.At >= 0 {
startOffset = kgo.NewOffset().At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
for _, partition := range partitions {
p, err := b.getPartitionStatus(ctx, partition, commits)
if err != nil {
return nil, err
}
ps = append(ps, p)
}

sort.Slice(ps, func(i, j int) bool {
return ps[i].lag() > ps[j].lag()
})

return ps, nil
}

func (b *BlockBuilder) getPartitionStatus(ctx context.Context, partition int32, commits kadm.OffsetResponses) (PartitionStatus, error) {
var (
topic = b.cfg.IngestStorageConfig.Kafka.Topic
partitionStatus = PartitionStatus{partition: partition, startOffset: -1, endOffset: -1}
)

lastCommit, found := commits.Lookup(topic, partition)
if found {
partitionStatus.startOffset = lastCommit.At
}

ends, err := b.kadm.ListEndOffsets(ctx, topic)
if err != nil {
return false, err
return partitionStatus, err
}
if err := ends.Error(); err != nil {
return false, err
return partitionStatus, err
}
lastPossibleMessage, lastPossibleMessageFound := ends.Lookup(topic, partition)
lastRecord, found := ends.Lookup(topic, partition)

if !found {
return partitionStatus, nil
}
partitionStatus.endOffset = lastRecord.Offset
partitionStatus.hasRecords = true

return partitionStatus, nil
}

func (b *BlockBuilder) consumePartition(ctx context.Context, ps PartitionStatus) (more bool, err error) {
defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(ps.partition))).Observe(time.Since(t).Seconds())
}(time.Now())

var (
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
partLabel = strconv.Itoa(int(ps.partition))
startOffset kgo.Offset
init bool
writer *writer
lastRec *kgo.Record
end time.Time
processedRecords int
)

startOffset = ps.getStartOffset()

level.Info(b.logger).Log(
"msg", "consuming partition",
"partition", partition,
"commit_offset", lastCommit.At,
"partition", ps.partition,
"commit_offset", ps.startOffset,
"start_offset", startOffset,
"lag", ps.lag(),
)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
Expand All @@ -288,10 +360,10 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
// from one partition at a time. I.e. when this partition is consumed, we start consuming the next one.
b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{
topic: {
partition: startOffset,
ps.partition: startOffset,
},
})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {ps.partition}})

outer:
for {
Expand Down Expand Up @@ -332,30 +404,22 @@ outer:
if !init {
end = rec.Timestamp.Add(dur) // When block will be cut
metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds())
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.WAL.IngestionSlack, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer = newPartitionSectionWriter(b.logger, uint64(ps.partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.WAL.IngestionSlack, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
init = true
}

if rec.Timestamp.After(end) {
// Cut this block but continue only if we have at least another full cycle
if overallEnd.Sub(rec.Timestamp) >= dur {
more = true
}
break outer
}

if rec.Timestamp.After(overallEnd) {
break outer
}

err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer)
if err != nil {
return false, err
}

processedRecords++
lastRec = rec

if lastPossibleMessageFound && lastRec.Offset >= lastPossibleMessage.Offset-1 {
Comment thread
javiermolinar marked this conversation as resolved.
if lastRec.Offset >= ps.endOffset-1 {
// We reached the end so break now and avoid another poll which is expected to be empty.
break outer
}
Expand All @@ -364,9 +428,12 @@ outer:

if lastRec == nil {
// Received no data

level.Info(b.logger).Log(
"msg", "no data",
"partition", partition,
"partition", ps.partition,
"commit_offset", ps.startOffset,
"start_offset", startOffset,
)
return false, nil
}
Expand All @@ -385,10 +452,16 @@ outer:
return false, err
}

if lastRec.Offset < ps.endOffset {
more = true
}

level.Info(b.logger).Log(
"msg", "successfully committed offset to kafka",
"partition", partition,
"partition", ps.partition,
"last_record", lastRec.Offset,
"more_records_left", more,
"processed_records", processedRecords,
)

return more, nil
Expand Down
5 changes: 3 additions & 2 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() == 2
l := kafkaCommits.Load()
return l == 2
}, time.Minute, time.Second)

// Wait for the block to be flushed.
Expand Down Expand Up @@ -552,7 +553,7 @@ func BenchmarkBlockBuilder(b *testing.B) {
}
b.StartTimer()

err = bb.consume(ctx)
_, err = bb.consume(ctx)
require.NoError(b, err)

b.SetBytes(int64(size))
Expand Down