Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
* [BUGFIX] Fixes where ingester may leave file open [#570](https://github.com/grafana/tempo/pull/570)
* [BUGFIX] Fixes a bug where some blocks were not searched due to query sharding and randomness in blocklist poll. [#583](https://github.com/grafana/tempo/pull/583)
* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593)

## v0.6.0

Expand Down
173 changes: 137 additions & 36 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ingester
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -38,9 +38,10 @@ var (
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second
initialBackoff = 30 * time.Second
flushJitter = 10 * time.Second
maxBackoff = 120 * time.Second
maxCompleteAttempts = 3
)

const (
Expand Down Expand Up @@ -99,18 +100,22 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
}

type flushOp struct {
kind int
from int64
userID string
blockID uuid.UUID
kind int
at time.Time // When to execute
attempts uint
backoff time.Duration
userID string
blockID uuid.UUID
}

func (o *flushOp) Key() string {
return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String()
}

// Priority orders entries in the queue. The larger the number the higher the priority, so inverted here to
// prioritize entries with earliest timestamps.
func (o *flushOp) Priority() int64 {
return -o.from
return -o.at.Unix()
}

// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
Expand Down Expand Up @@ -138,12 +143,13 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

if blockID != uuid.Nil {
i.flushQueues.Enqueue(&flushOp{
// jitter to help when flushing many instances at the same time
// no jitter if immediate (initiated via /flush handler for example)
i.enqueue(&flushOp{
kind: opKindComplete,
from: math.MaxInt64,
userID: instance.instanceID,
blockID: blockID,
})
}, !immediate)
}

// dump any blocks that have been flushed for awhile
Expand All @@ -165,49 +171,80 @@ func (i *Ingester) flushLoop(j int) {
return
}
op := o.(*flushOp)
op.attempts++

retry := false

var completeBlockID uuid.UUID
var err error
if op.kind == opKindComplete {
// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
continue
}

level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, log and continue
level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
handleFailedOp(op, err)
continue
}

completeBlockID, err = instance.CompleteBlock(op.blockID)
if completeBlockID != uuid.Nil {
err = instance.CompleteBlock(op.blockID)
Comment thread
mdisibio marked this conversation as resolved.
if err != nil {
handleFailedOp(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())

err = instance.ClearCompletingBlock(op.blockID)
if err != nil {
// Failure to delete the WAL doesn't prevent flushing the bloc
handleFailedOp(op, err)
}
} else {
retry = true
}
} else {
// add a flushOp for the block we just completed
i.flushQueues.Enqueue(&flushOp{
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
from: time.Now().Unix(),
userID: instance.instanceID,
blockID: completeBlockID,
})
blockID: op.blockID,
}, false)
}

} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")
level.Info(log.Logger).Log("msg", "flushing block", "userid", op.userID, "block", op.blockID.String())
Comment thread
joe-elliott marked this conversation as resolved.

err = i.flushBlock(op.userID, op.blockID)
err := i.flushBlock(op.userID, op.blockID)
if err != nil {
handleFailedOp(op, err)
retry = true
}
}

if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "err", err)
metricFailedFlushes.Inc()
// re-queue op with backoff
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
if retry {
i.requeue(op)
} else {
i.flushQueues.Clear(op)
}

i.flushQueues.Clear(op)
}
}

func handleFailedOp(op *flushOp, err error) {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts, "err", err)
metricFailedFlushes.Inc()
}

func handleAbandonedOp(op *flushOp) {
level.Info(log.WithUserID(op.userID, log.Logger)).Log("msg", "Abandoning op in flush queue because ingester is shutting down",
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts)
}

func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
instance, err := i.getOrCreateInstance(userID)
if err != nil {
Expand All @@ -229,10 +266,74 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
if err != nil {
return err
}

// Delete original wal only after successful flush
err = instance.ClearCompletingBlock(blockID)
if err != nil {
// Error deleting wal doesn't fail the flush
level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err)
metricFailedFlushes.Inc()
}

metricBlocksFlushed.Inc()
} else {
return fmt.Errorf("error getting block to flush")
}

return nil
}

func (i *Ingester) enqueue(op *flushOp, jitter bool) {
delay := time.Duration(0)

if jitter {
delay = time.Duration(rand.Float32() * float32(flushJitter))
}

op.at = time.Now().Add(delay)

go func() {
time.Sleep(delay)

// Check if shutdown initiated
if i.flushQueues.IsStopped() {
Comment thread
mdisibio marked this conversation as resolved.
handleAbandonedOp(op)
return
}

err := i.flushQueues.Enqueue(op)
if err != nil {
handleFailedOp(op, err)
}
}()
}

func (i *Ingester) requeue(op *flushOp) {
op.backoff *= 2
if op.backoff < initialBackoff {
op.backoff = initialBackoff
}
if op.backoff > maxBackoff {
op.backoff = maxBackoff
}

op.at = time.Now().Add(op.backoff)

level.Info(log.WithUserID(op.userID, log.Logger)).Log("msg", "retrying op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "backoff", op.backoff)

go func() {
time.Sleep(op.backoff)

// Check if shutdown initiated
if i.flushQueues.IsStopped() {
handleAbandonedOp(op)
return
}

err := i.flushQueues.Requeue(op)
if err != nil {
handleFailedOp(op, err)
}
}()
}
45 changes: 30 additions & 15 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,21 +281,17 @@ func (i *Ingester) replayWal() error {
blocks, err := i.store.WAL().AllBlocks()
// todo: should this fail startup?
if err != nil {
level.Error(log.Logger).Log("msg", "error beginning wal replay", "err", err)
return nil
}

level.Info(log.Logger).Log("msg", "beginning wal replay", "numBlocks", len(blocks))

for _, b := range blocks {
tenantID := b.TenantID()
level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID)
level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID, "block", b.BlockID())

instance, err := i.getOrCreateInstance(tenantID)
if err != nil {
return err
}

err = i.replayBlock(b, instance)
err = i.replayBlock(b)
if err != nil {
// there was an error, log and keep on keeping on
level.Error(log.Logger).Log("msg", "error replaying block. removing", "error", err)
Expand All @@ -306,29 +302,48 @@ func (i *Ingester) replayWal() error {
}
}

level.Info(log.Logger).Log("msg", "wal replay complete")

return nil
}

func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock, instance *instance) error {
func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error {
iterator, err := b.Iterator()
if err != nil {
return err
}
defer iterator.Close()

ctx := context.Background()

// Pull first entry to see if block has any data
id, obj, err := iterator.Next(ctx)
if err != nil {
return err
}
if id == nil {
// Block is empty
return nil
}

// Only create instance for tenant now that we know data exists
instance, err := i.getOrCreateInstance(b.TenantID())
if err != nil {
return err
}

for {
id, obj, err := iterator.Next(ctx)
if id == nil {
break
}
// obj gets written to disk immediately but the id escapes the iterator and needs to be copied
writeID := append([]byte(nil), id...)
err = instance.PushBytes(context.Background(), writeID, obj)
if err != nil {
return err
}

// obj gets written to disk immediately but the id escapes the iterator and needs to be copied
writeID := append([]byte(nil), id...)
err = instance.PushBytes(context.Background(), writeID, obj)
id, obj, err = iterator.Next(ctx)
if id == nil {
break
}
if err != nil {
return err
}
Expand Down
Loading