diff --git a/CHANGELOG.md b/CHANGELOG.md index 27d27fbe99d..57abfea87b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567) * [BUGFIX] Fixes where ingester may leave file open [#570](https://github.com/grafana/tempo/pull/570) * [BUGFIX] Fixes a bug where some blocks were not searched due to query sharding and randomness in blocklist poll. [#583](https://github.com/grafana/tempo/pull/583) +* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593) ## v0.6.0 diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 99e300cbe7e..03ddb3c43de 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -3,7 +3,7 @@ package ingester import ( "context" "fmt" - "math" + "math/rand" "net/http" "strconv" "time" @@ -38,9 +38,10 @@ var ( ) const ( - // Backoff for retrying 'immediate' flushes. Only counts for queue - // position, not wallclock time. - flushBackoff = 1 * time.Second + initialBackoff = 30 * time.Second + flushJitter = 10 * time.Second + maxBackoff = 120 * time.Second + maxCompleteAttempts = 3 ) const ( @@ -99,18 +100,22 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { } type flushOp struct { - kind int - from int64 - userID string - blockID uuid.UUID + kind int + at time.Time // When to execute + attempts uint + backoff time.Duration + userID string + blockID uuid.UUID } func (o *flushOp) Key() string { return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String() } +// Priority orders entries in the queue. The larger the number the higher the priority, so inverted here to +// prioritize entries with earliest timestamps. func (o *flushOp) Priority() int64 { - return -o.from + return -o.at.Unix() } // sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series @@ -138,12 +143,13 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } if blockID != uuid.Nil { - i.flushQueues.Enqueue(&flushOp{ + // jitter to help when flushing many instances at the same time + // no jitter if immediate (initiated via /flush handler for example) + i.enqueue(&flushOp{ kind: opKindComplete, - from: math.MaxInt64, userID: instance.instanceID, blockID: blockID, - }) + }, !immediate) } // dump any blocks that have been flushed for awhile @@ -165,49 +171,80 @@ func (i *Ingester) flushLoop(j int) { return } op := o.(*flushOp) + op.attempts++ + + retry := false - var completeBlockID uuid.UUID - var err error if op.kind == opKindComplete { + // No point in proceeding if shutdown has been initiated since + // we won't be able to queue up the next flush op + if i.flushQueues.IsStopped() { + handleAbandonedOp(op) + continue + } + level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID) - instance, exists := i.getInstanceByID(op.userID) - if !exists { - // instance no longer exists? that's bad, log and continue - level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID) + instance, err := i.getOrCreateInstance(op.userID) + if err != nil { + handleFailedOp(op, err) continue } - completeBlockID, err = instance.CompleteBlock(op.blockID) - if completeBlockID != uuid.Nil { + err = instance.CompleteBlock(op.blockID) + if err != nil { + handleFailedOp(op, err) + + if op.attempts >= maxCompleteAttempts { + level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS", + "userID", op.userID, "attempts", op.attempts, "block", op.blockID.String()) + + err = instance.ClearCompletingBlock(op.blockID) + if err != nil { + // Failure to delete the WAL doesn't prevent flushing the bloc + handleFailedOp(op, err) + } + } else { + retry = true + } + } else { // add a flushOp for the block we just completed - i.flushQueues.Enqueue(&flushOp{ + // No delay + i.enqueue(&flushOp{ kind: opKindFlush, - from: time.Now().Unix(), userID: instance.instanceID, - blockID: completeBlockID, - }) + blockID: op.blockID, + }, false) } } else { - level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp") + level.Info(log.Logger).Log("msg", "flushing block", "userid", op.userID, "block", op.blockID.String()) - err = i.flushBlock(op.userID, op.blockID) + err := i.flushBlock(op.userID, op.blockID) + if err != nil { + handleFailedOp(op, err) + retry = true + } } - if err != nil { - level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue", - "op", op.kind, "block", op.blockID.String(), "err", err) - metricFailedFlushes.Inc() - // re-queue op with backoff - op.from += int64(flushBackoff) - i.flushQueues.Requeue(op) - continue + if retry { + i.requeue(op) + } else { + i.flushQueues.Clear(op) } - - i.flushQueues.Clear(op) } } +func handleFailedOp(op *flushOp, err error) { + level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue", + "op", op.kind, "block", op.blockID.String(), "attempts", op.attempts, "err", err) + metricFailedFlushes.Inc() +} + +func handleAbandonedOp(op *flushOp) { + level.Info(log.WithUserID(op.userID, log.Logger)).Log("msg", "Abandoning op in flush queue because ingester is shutting down", + "op", op.kind, "block", op.blockID.String(), "attempts", op.attempts) +} + func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { instance, err := i.getOrCreateInstance(userID) if err != nil { @@ -229,6 +266,15 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { if err != nil { return err } + + // Delete original wal only after successful flush + err = instance.ClearCompletingBlock(blockID) + if err != nil { + // Error deleting wal doesn't fail the flush + level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err) + metricFailedFlushes.Inc() + } + metricBlocksFlushed.Inc() } else { return fmt.Errorf("error getting block to flush") @@ -236,3 +282,58 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { return nil } + +func (i *Ingester) enqueue(op *flushOp, jitter bool) { + delay := time.Duration(0) + + if jitter { + delay = time.Duration(rand.Float32() * float32(flushJitter)) + } + + op.at = time.Now().Add(delay) + + go func() { + time.Sleep(delay) + + // Check if shutdown initiated + if i.flushQueues.IsStopped() { + handleAbandonedOp(op) + return + } + + err := i.flushQueues.Enqueue(op) + if err != nil { + handleFailedOp(op, err) + } + }() +} + +func (i *Ingester) requeue(op *flushOp) { + op.backoff *= 2 + if op.backoff < initialBackoff { + op.backoff = initialBackoff + } + if op.backoff > maxBackoff { + op.backoff = maxBackoff + } + + op.at = time.Now().Add(op.backoff) + + level.Info(log.WithUserID(op.userID, log.Logger)).Log("msg", "retrying op in flushQueue", + "op", op.kind, "block", op.blockID.String(), "backoff", op.backoff) + + go func() { + time.Sleep(op.backoff) + + // Check if shutdown initiated + if i.flushQueues.IsStopped() { + handleAbandonedOp(op) + return + } + + err := i.flushQueues.Requeue(op) + if err != nil { + handleFailedOp(op, err) + } + }() +} diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 2c31b802fd3..eb20448aba0 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -281,6 +281,7 @@ func (i *Ingester) replayWal() error { blocks, err := i.store.WAL().AllBlocks() // todo: should this fail startup? if err != nil { + level.Error(log.Logger).Log("msg", "error beginning wal replay", "err", err) return nil } @@ -288,14 +289,9 @@ func (i *Ingester) replayWal() error { for _, b := range blocks { tenantID := b.TenantID() - level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID) + level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID, "block", b.BlockID()) - instance, err := i.getOrCreateInstance(tenantID) - if err != nil { - return err - } - - err = i.replayBlock(b, instance) + err = i.replayBlock(b) if err != nil { // there was an error, log and keep on keeping on level.Error(log.Logger).Log("msg", "error replaying block. removing", "error", err) @@ -306,10 +302,12 @@ func (i *Ingester) replayWal() error { } } + level.Info(log.Logger).Log("msg", "wal replay complete") + return nil } -func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock, instance *instance) error { +func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error { iterator, err := b.Iterator() if err != nil { return err @@ -317,18 +315,35 @@ func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock, instance *instance) e defer iterator.Close() ctx := context.Background() + + // Pull first entry to see if block has any data + id, obj, err := iterator.Next(ctx) + if err != nil { + return err + } + if id == nil { + // Block is empty + return nil + } + + // Only create instance for tenant now that we know data exists + instance, err := i.getOrCreateInstance(b.TenantID()) + if err != nil { + return err + } + for { - id, obj, err := iterator.Next(ctx) - if id == nil { - break - } + // obj gets written to disk immediately but the id escapes the iterator and needs to be copied + writeID := append([]byte(nil), id...) + err = instance.PushBytes(context.Background(), writeID, obj) if err != nil { return err } - // obj gets written to disk immediately but the id escapes the iterator and needs to be copied - writeID := append([]byte(nil), id...) - err = instance.PushBytes(context.Background(), writeID, obj) + id, obj, err = iterator.Next(ctx) + if id == nil { + break + } if err != nil { return err } diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 53a0e053457..6f480e5b951 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -160,8 +160,8 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes return uuid.Nil, nil } -// CompleteBlock() moves a completingBlock to a completeBlock -func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) { +// CompleteBlock() moves a completingBlock to a completeBlock. The new completeBlock has the same ID +func (i *instance) CompleteBlock(blockID uuid.UUID) error { i.blocksMtx.Lock() var completingBlock *wal.AppendBlock @@ -174,7 +174,7 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) { i.blocksMtx.Unlock() if completingBlock == nil { - return uuid.Nil, fmt.Errorf("error finding completingBlock") + return fmt.Errorf("error finding completingBlock") } // potentially long running operation placed outside blocksMtx @@ -182,27 +182,34 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) { if err != nil { metricFailedFlushes.Inc() level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err) - return uuid.Nil, err + return err } - // remove completingBlock and add completeBlock i.blocksMtx.Lock() + i.completeBlocks = append(i.completeBlocks, completeBlock) + i.blocksMtx.Unlock() + + return nil +} + +// nolint:interfacer +func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error { + i.blocksMtx.Lock() + var completingBlock *wal.AppendBlock for j, iterBlock := range i.completingBlocks { if iterBlock.BlockID() == blockID { + completingBlock = iterBlock i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...) break } } - completeBlockID := completeBlock.BlockMeta().BlockID - i.completeBlocks = append(i.completeBlocks, completeBlock) i.blocksMtx.Unlock() - err = completingBlock.Clear() - if err != nil { - level.Error(log.Logger).Log("msg", "Error clearing wal", "tenantID", i.instanceID, "err", err) + if completingBlock != nil { + return completingBlock.Clear() } - return completeBlockID, nil + return fmt.Errorf("Error finding wal completingBlock to clear") } // GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 4879061bf14..2594316880f 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -58,13 +58,12 @@ func TestInstance(t *testing.T) { assert.NoError(t, err, "unexpected error cutting block") assert.NotEqual(t, blockID, uuid.Nil) - completeBlockID, err := i.CompleteBlock(blockID) + err = i.CompleteBlock(blockID) assert.NoError(t, err, "unexpected error completing block") - assert.NotEqual(t, completeBlockID, uuid.Nil) - block := i.GetBlockToBeFlushed(completeBlockID) + block := i.GetBlockToBeFlushed(blockID) require.NotNil(t, block) - assert.Len(t, i.completingBlocks, 0) + assert.Len(t, i.completingBlocks, 1) assert.Len(t, i.completeBlocks, 1) err = ingester.store.WriteBlock(context.Background(), block) @@ -130,19 +129,15 @@ func TestInstanceFind(t *testing.T) { pushAndQuery(t, i, request2) assert.Len(t, i.completingBlocks, 2) - _, err = i.CompleteBlock(blockID) + err = i.CompleteBlock(blockID) assert.NoError(t, err, "unexpected error completing block") - assert.Len(t, i.completingBlocks, 1) + assert.Len(t, i.completingBlocks, 2) traceID := test.MustTraceID(request) trace, err := i.FindTraceByID(traceID) assert.NotNil(t, trace) assert.NoError(t, err) - - completeBlockID, err := i.CompleteBlock(blockID) - assert.EqualError(t, err, "error finding completingBlock") - assert.Equal(t, completeBlockID, uuid.Nil) } func TestInstanceDoesNotRace(t *testing.T) { @@ -185,14 +180,12 @@ func TestInstanceDoesNotRace(t *testing.T) { go concurrent(func() { blockID, _ := i.CutBlockIfReady(0, 0, false) if blockID != uuid.Nil { - completeBlockID, err := i.CompleteBlock(blockID) + err := i.CompleteBlock(blockID) assert.NoError(t, err, "unexpected error completing block") - if completeBlockID != uuid.Nil { - block := i.GetBlockToBeFlushed(completeBlockID) - require.NotNil(t, block) - err := ingester.store.WriteBlock(context.Background(), block) - assert.NoError(t, err, "error writing block") - } + block := i.GetBlockToBeFlushed(blockID) + require.NotNil(t, block) + err = ingester.store.WriteBlock(context.Background(), block) + assert.NoError(t, err, "error writing block") } }) @@ -444,7 +437,7 @@ func TestInstanceCutBlockIfReady(t *testing.T) { blockID, err := instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate) require.NoError(t, err) - _, err = instance.CompleteBlock(blockID) + err = instance.CompleteBlock(blockID) if tc.expectedToCutBlock { assert.NoError(t, err, "unexpected error completing block") } diff --git a/pkg/flushqueues/exclusivequeues.go b/pkg/flushqueues/exclusivequeues.go index 97eb288488c..fc5e5a85cc8 100644 --- a/pkg/flushqueues/exclusivequeues.go +++ b/pkg/flushqueues/exclusivequeues.go @@ -3,56 +3,57 @@ package flushqueues import ( "sync" - "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/uber-go/atomic" ) type ExclusiveQueues struct { - queues []*util.PriorityQueue + queues []*PriorityQueue index *atomic.Int32 activeKeys sync.Map + stopped bool } // New creates a new set of flush queues with a prom gauge to track current depth func New(queues int, metric prometheus.Gauge) *ExclusiveQueues { f := &ExclusiveQueues{ - queues: make([]*util.PriorityQueue, queues), + queues: make([]*PriorityQueue, queues), index: atomic.NewInt32(0), } for j := 0; j < queues; j++ { - f.queues[j] = util.NewPriorityQueue(metric) + f.queues[j] = NewPriorityQueue(metric) } return f } // Enqueue adds the op to the next queue and prevents any other items to be added with this key -func (f *ExclusiveQueues) Enqueue(op util.Op) { +func (f *ExclusiveQueues) Enqueue(op Op) error { _, ok := f.activeKeys.Load(op.Key()) if ok { - return + return nil } f.activeKeys.Store(op.Key(), struct{}{}) - f.Requeue(op) + return f.Requeue(op) } // Dequeue removes the next op from the requested queue. After dequeueing the calling // process either needs to call ClearKey or Requeue -func (f *ExclusiveQueues) Dequeue(q int) util.Op { +func (f *ExclusiveQueues) Dequeue(q int) Op { return f.queues[q].Dequeue() } // Requeue adds an op that is presumed to already be covered by activeKeys -func (f *ExclusiveQueues) Requeue(op util.Op) { +func (f *ExclusiveQueues) Requeue(op Op) error { flushQueueIndex := int(f.index.Inc()) % len(f.queues) - f.queues[flushQueueIndex].Enqueue(op) + _, err := f.queues[flushQueueIndex].Enqueue(op) + return err } // Clear unblocks the requested op. This should be called only after a flush has been successful -func (f *ExclusiveQueues) Clear(op util.Op) { +func (f *ExclusiveQueues) Clear(op Op) { f.activeKeys.Delete(op.Key()) } @@ -69,7 +70,13 @@ func (f *ExclusiveQueues) IsEmpty() bool { // Stop closes all queues func (f *ExclusiveQueues) Stop() { + f.stopped = true + for _, q := range f.queues { q.Close() } } + +func (f *ExclusiveQueues) IsStopped() bool { + return f.stopped +} diff --git a/pkg/flushqueues/exclusivequeues_test.go b/pkg/flushqueues/exclusivequeues_test.go index bd4f9e0cc15..77724c7bd84 100644 --- a/pkg/flushqueues/exclusivequeues_test.go +++ b/pkg/flushqueues/exclusivequeues_test.go @@ -33,12 +33,16 @@ func TestExclusiveQueues(t *testing.T) { } // enqueue twice - q.Enqueue(op) + err := q.Enqueue(op) + assert.NoError(t, err) + length, err := test.GetGaugeValue(gauge) assert.NoError(t, err) assert.Equal(t, 1, int(length)) - q.Enqueue(op) + err = q.Enqueue(op) + assert.NoError(t, err) + length, err = test.GetGaugeValue(gauge) assert.NoError(t, err) assert.Equal(t, 1, int(length)) @@ -49,7 +53,9 @@ func TestExclusiveQueues(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, int(length)) - q.Requeue(op) + err = q.Requeue(op) + assert.NoError(t, err) + length, err = test.GetGaugeValue(gauge) assert.NoError(t, err) assert.Equal(t, 1, int(length)) @@ -65,7 +71,9 @@ func TestExclusiveQueues(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, int(length)) - q.Enqueue(op) + err = q.Enqueue(op) + assert.NoError(t, err) + length, err = test.GetGaugeValue(gauge) assert.NoError(t, err) assert.Equal(t, 1, int(length)) @@ -87,7 +95,8 @@ func TestMultipleQueues(t *testing.T) { key: uuid.New().String(), } - q.Enqueue(op) + err := q.Enqueue(op) + assert.NoError(t, err) length, err := test.GetGaugeValue(gauge) assert.NoError(t, err) diff --git a/pkg/flushqueues/priority_queue.go b/pkg/flushqueues/priority_queue.go new file mode 100644 index 00000000000..5c140f2957d --- /dev/null +++ b/pkg/flushqueues/priority_queue.go @@ -0,0 +1,130 @@ +package flushqueues + +import ( + "container/heap" + "errors" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// PriorityQueue is a priority queue. +type PriorityQueue struct { + lock sync.Mutex + cond *sync.Cond + closing bool + closed bool + hit map[string]struct{} + queue queue + lengthGauge prometheus.Gauge +} + +// Op is an operation on the priority queue. +type Op interface { + Key() string + Priority() int64 // The larger the number the higher the priority. +} + +type queue []Op + +func (q queue) Len() int { return len(q) } +func (q queue) Less(i, j int) bool { return q[i].Priority() > q[j].Priority() } +func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } + +// Push and Pop use pointer receivers because they modify the slice's length, +// not just its contents. +func (q *queue) Push(x interface{}) { + *q = append(*q, x.(Op)) +} + +func (q *queue) Pop() interface{} { + old := *q + n := len(old) + x := old[n-1] + *q = old[0 : n-1] + return x +} + +// NewPriorityQueue makes a new priority queue. +func NewPriorityQueue(lengthGauge prometheus.Gauge) *PriorityQueue { + pq := &PriorityQueue{ + hit: map[string]struct{}{}, + lengthGauge: lengthGauge, + } + pq.cond = sync.NewCond(&pq.lock) + heap.Init(&pq.queue) + return pq +} + +// Length returns the length of the queue. +func (pq *PriorityQueue) Length() int { + pq.lock.Lock() + defer pq.lock.Unlock() + return len(pq.queue) +} + +// Close signals that the queue should be closed when it is empty. +// A closed queue will not accept new items. +func (pq *PriorityQueue) Close() { + pq.lock.Lock() + defer pq.lock.Unlock() + pq.closing = true + pq.cond.Broadcast() +} + +// DiscardAndClose closes the queue and removes all the items from it. +func (pq *PriorityQueue) DiscardAndClose() { + pq.lock.Lock() + defer pq.lock.Unlock() + pq.closed = true + pq.queue = nil + pq.hit = map[string]struct{}{} + pq.cond.Broadcast() +} + +// Enqueue adds an operation to the queue in priority order. Returns +// true if added; false if the operation was already on the queue. +func (pq *PriorityQueue) Enqueue(op Op) (bool, error) { + pq.lock.Lock() + defer pq.lock.Unlock() + + if pq.closed { + return false, errors.New("enqueue on closed queue") + } + + _, enqueued := pq.hit[op.Key()] + if enqueued { + return false, nil + } + + pq.hit[op.Key()] = struct{}{} + heap.Push(&pq.queue, op) + pq.cond.Broadcast() + if pq.lengthGauge != nil { + pq.lengthGauge.Inc() + } + return true, nil +} + +// Dequeue will return the op with the highest priority; block if queue is +// empty; returns nil if queue is closed. +func (pq *PriorityQueue) Dequeue() Op { + pq.lock.Lock() + defer pq.lock.Unlock() + + for len(pq.queue) == 0 && !(pq.closing || pq.closed) { + pq.cond.Wait() + } + + if len(pq.queue) == 0 && (pq.closing || pq.closed) { + pq.closed = true + return nil + } + + op := heap.Pop(&pq.queue).(Op) + delete(pq.hit, op.Key()) + if pq.lengthGauge != nil { + pq.lengthGauge.Dec() + } + return op +} diff --git a/pkg/flushqueues/priority_queue_test.go b/pkg/flushqueues/priority_queue_test.go new file mode 100644 index 00000000000..ea7e6bc094d --- /dev/null +++ b/pkg/flushqueues/priority_queue_test.go @@ -0,0 +1,86 @@ +package flushqueues + +import ( + "runtime" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type simpleItem int64 + +func (i simpleItem) Priority() int64 { + return int64(i) +} + +func (i simpleItem) Key() string { + return strconv.FormatInt(int64(i), 10) +} + +func TestPriorityQueueBasic(t *testing.T) { + queue := NewPriorityQueue(nil) + assert.Equal(t, 0, queue.Length(), "Expected length = 0") + + _, err := queue.Enqueue(simpleItem(1)) + assert.NoError(t, err) + assert.Equal(t, 1, queue.Length(), "Expected length = 1") + + i, ok := queue.Dequeue().(simpleItem) + assert.True(t, ok, "Expected cast to succeed") + assert.Equal(t, simpleItem(1), i, "Expected to dequeue simpleItem(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueuePriorities(t *testing.T) { + queue := NewPriorityQueue(nil) + + _, err := queue.Enqueue(simpleItem(1)) + assert.NoError(t, err) + + _, err = queue.Enqueue(simpleItem(2)) + assert.NoError(t, err) + + assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") + assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueuePriorities2(t *testing.T) { + queue := NewPriorityQueue(nil) + + _, err := queue.Enqueue(simpleItem(2)) + assert.NoError(t, err) + + _, err = queue.Enqueue(simpleItem(1)) + assert.NoError(t, err) + + assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") + assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueueWait(t *testing.T) { + queue := NewPriorityQueue(nil) + + done := make(chan struct{}) + go func() { + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") + close(done) + }() + + queue.Close() + runtime.Gosched() + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatal("Close didn't unblock Dequeue.") + } +} diff --git a/tempodb/encoding/complete_block.go b/tempodb/encoding/complete_block.go index 1f9633b74e4..33db0afe4ae 100644 --- a/tempodb/encoding/complete_block.go +++ b/tempodb/encoding/complete_block.go @@ -9,7 +9,6 @@ import ( "go.uber.org/atomic" - "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -37,7 +36,7 @@ type CompleteBlock struct { func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string) (*CompleteBlock, error) { c := &CompleteBlock{ encoding: latestEncoding(), - meta: backend.NewBlockMeta(originatingMeta.TenantID, uuid.New(), currentVersion, cfg.Encoding), + meta: backend.NewBlockMeta(originatingMeta.TenantID, originatingMeta.BlockID, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), records: make([]*common.Record, 0), filepath: filepath, diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 1216e04e838..d586dd03146 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -73,6 +73,7 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo if err != nil { return nil, err } + h.appendFile = nil } records := h.appender.Records() @@ -113,10 +114,12 @@ func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte func (h *AppendBlock) Clear() error { if h.readFile != nil { _ = h.readFile.Close() + h.readFile = nil } if h.appendFile != nil { _ = h.appendFile.Close() + h.appendFile = nil } name := h.fullFilename() diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 35da1ef9265..3b7614c9247 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -24,6 +24,10 @@ func (r *ReplayBlock) TenantID() string { return r.meta.TenantID } +func (r *ReplayBlock) BlockID() string { + return r.meta.BlockID.String() +} + func (r *ReplayBlock) Clear() error { if r.readFile != nil { _ = r.readFile.Close()