Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
* [BUGFIX] Fixes where ingester may leave file open [#570](https://github.com/grafana/tempo/pull/570)
* [BUGFIX] Fixes a bug where some blocks were not searched due to query sharding and randomness in blocklist poll. [#583](https://github.com/grafana/tempo/pull/583)
* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593)

## v0.6.0

Expand Down
123 changes: 90 additions & 33 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ingester
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -38,9 +38,10 @@ var (
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second
initialBackoff = 1 * time.Second
flushJitter = 10 * time.Second
maxBackoff = time.Minute
maxCompleteAttempts = 10
Comment thread
mdisibio marked this conversation as resolved.
Outdated
)

const (
Expand Down Expand Up @@ -99,18 +100,22 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
}

type flushOp struct {
kind int
from int64
userID string
blockID uuid.UUID
kind int
at time.Time // When to execute
attempts uint
backoff time.Duration
userID string
blockID uuid.UUID
}

func (o *flushOp) Key() string {
return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String()
}

// Priority orders entries in the queue. The larger the number the higher the priority, so inverted here to
// prioritize entries with earliest timestamps.
func (o *flushOp) Priority() int64 {
return -o.from
return -o.at.Unix()
}

// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
Expand Down Expand Up @@ -138,9 +143,9 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

if blockID != uuid.Nil {
i.flushQueues.Enqueue(&flushOp{
// jitter to help when flushing many instances at the same time
i.enqueueWithJitter(&flushOp{
kind: opKindComplete,
from: math.MaxInt64,
userID: instance.instanceID,
blockID: blockID,
})
Expand All @@ -165,49 +170,68 @@ func (i *Ingester) flushLoop(j int) {
return
}
op := o.(*flushOp)
op.attempts++

retry := false

var completeBlockID uuid.UUID
var err error
if op.kind == opKindComplete {
level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, log and continue
level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
handleFlushError(op, err)
continue
}

completeBlockID, err = instance.CompleteBlock(op.blockID)
if completeBlockID != uuid.Nil {
err = instance.CompleteBlock(op.blockID)
Comment thread
mdisibio marked this conversation as resolved.
if err != nil {
handleFlushError(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())

err = instance.ClearCompletingBlock(op.blockID)
if err != nil {
// Failure to delete the WAL doesn't prevent flushing the bloc
handleFlushError(op, err)
}
} else {
retry = true
}
} else {
// add a flushOp for the block we just completed
i.flushQueues.Enqueue(&flushOp{
kind: opKindFlush,
from: time.Now().Unix(),
at: time.Now(),
userID: instance.instanceID,
blockID: completeBlockID,
blockID: op.blockID,
})
}

} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")
level.Info(log.Logger).Log("msg", "flushing block", "userid", op.userID, "block", op.blockID.String())
Comment thread
joe-elliott marked this conversation as resolved.

err = i.flushBlock(op.userID, op.blockID)
err := i.flushBlock(op.userID, op.blockID)
if err != nil {
handleFlushError(op, err)
retry = true
}
}

if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "err", err)
metricFailedFlushes.Inc()
// re-queue op with backoff
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
if retry {
i.requeue(op)
} else {
i.flushQueues.Clear(op)
}

i.flushQueues.Clear(op)
}
}

func handleFlushError(op *flushOp, err error) {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts, "err", err)
metricFailedFlushes.Inc()
}

func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
instance, err := i.getOrCreateInstance(userID)
if err != nil {
Expand All @@ -229,10 +253,43 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
if err != nil {
return err
}
// Delete original wal only after successful flush
instance.ClearCompletingBlock(blockID)
metricBlocksFlushed.Inc()
} else {
return fmt.Errorf("error getting block to flush")
}

return nil
}

func (i *Ingester) enqueueWithJitter(op *flushOp) {
delay := time.Duration(rand.Float32() * float32(flushJitter))

op.at = time.Now().Add(delay)

go func() {
time.Sleep(delay)
i.flushQueues.Enqueue(op)
}()
}

func (i *Ingester) requeue(op *flushOp) {
op.backoff *= 2
if op.backoff < initialBackoff {
op.backoff = initialBackoff
}
if op.backoff > maxBackoff {
op.backoff = maxBackoff
}

op.at = time.Now().Add(op.backoff)

level.Info(log.WithUserID(op.userID, log.Logger)).Log("msg", "retrying op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "backoff", op.backoff)

go func() {
time.Sleep(op.backoff)
i.flushQueues.Requeue(op)
}()
}
45 changes: 30 additions & 15 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,21 +281,17 @@ func (i *Ingester) replayWal() error {
blocks, err := i.store.WAL().AllBlocks()
// todo: should this fail startup?
if err != nil {
level.Error(log.Logger).Log("msg", "error beginning wal replay", "err", err)
return nil
}

level.Info(log.Logger).Log("msg", "beginning wal replay", "numBlocks", len(blocks))

for _, b := range blocks {
tenantID := b.TenantID()
level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID)
level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID, "block", b.BlockID())

instance, err := i.getOrCreateInstance(tenantID)
if err != nil {
return err
}

err = i.replayBlock(b, instance)
err = i.replayBlock(b)
if err != nil {
// there was an error, log and keep on keeping on
level.Error(log.Logger).Log("msg", "error replaying block. removing", "error", err)
Expand All @@ -306,29 +302,48 @@ func (i *Ingester) replayWal() error {
}
}

level.Info(log.Logger).Log("msg", "wal replay complete")

return nil
}

func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock, instance *instance) error {
func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error {
iterator, err := b.Iterator()
if err != nil {
return err
}
defer iterator.Close()

ctx := context.Background()

// Pull first entry to see if block has any data
id, obj, err := iterator.Next(ctx)
if err != nil {
return err
}
if id == nil {
// Block is empty
return nil
}

// Only create instance for tenant now that we know data exists
instance, err := i.getOrCreateInstance(b.TenantID())
if err != nil {
return err
}

for {
id, obj, err := iterator.Next(ctx)
if id == nil {
break
}
// obj gets written to disk immediately but the id escapes the iterator and needs to be copied
writeID := append([]byte(nil), id...)
err = instance.PushBytes(context.Background(), writeID, obj)
if err != nil {
return err
}

// obj gets written to disk immediately but the id escapes the iterator and needs to be copied
writeID := append([]byte(nil), id...)
err = instance.PushBytes(context.Background(), writeID, obj)
id, obj, err = iterator.Next(ctx)
if id == nil {
break
}
if err != nil {
return err
}
Expand Down
33 changes: 22 additions & 11 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes
return uuid.Nil, nil
}

// CompleteBlock() moves a completingBlock to a completeBlock
func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {
// CompleteBlock() moves a completingBlock to a completeBlock. The new completeBlock has the same ID
func (i *instance) CompleteBlock(blockID uuid.UUID) error {
i.blocksMtx.Lock()

var completingBlock *wal.AppendBlock
Expand All @@ -174,35 +174,46 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {
i.blocksMtx.Unlock()

if completingBlock == nil {
return uuid.Nil, fmt.Errorf("error finding completingBlock")
return fmt.Errorf("error finding completingBlock")
}

// potentially long running operation placed outside blocksMtx
completeBlock, err := i.writer.CompleteBlock(completingBlock, i)
if err != nil {
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err)
return uuid.Nil, err
return err
}

// remove completingBlock and add completeBlock
i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.blocksMtx.Unlock()

return nil
}

// nolint:interfacer
func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error {
i.blocksMtx.Lock()
var completingBlock *wal.AppendBlock
for j, iterBlock := range i.completingBlocks {
if iterBlock.BlockID() == blockID {
completingBlock = iterBlock
i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...)
break
}
}
completeBlockID := completeBlock.BlockMeta().BlockID
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.blocksMtx.Unlock()

err = completingBlock.Clear()
if err != nil {
level.Error(log.Logger).Log("msg", "Error clearing wal", "tenantID", i.instanceID, "err", err)
if completingBlock != nil {
return completingBlock.Clear()
//if err != nil {
// return err
//level.Error(log.Logger).Log("msg", "Error clearing wal", "tenantID", i.instanceID, "blockID", blockID.String(), "err", err)
//}
}

return completeBlockID, nil
return fmt.Errorf("Error finding wal completingBlock to clear")
}

// GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend
Expand Down
Loading