Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Add support for S3 V2 signatures. [#352](https://github.com/grafana/tempo/pull/352)
* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327)
* [BUGFIX] Fix distributors panicking on rollout [#343](https://github.com/grafana/tempo/pull/343)
* [BUGFIX] Fix ingesters occassionally double flushing [#364](https://github.com/grafana/tempo/pull/364)

## v0.3.0

Expand Down
20 changes: 10 additions & 10 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
w.WriteHeader(http.StatusNoContent)
Expand Down Expand Up @@ -104,9 +104,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
i.flushQueueIndex++
flushQueueIndex := i.flushQueueIndex % i.cfg.ConcurrentFlushes
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
})
Expand All @@ -120,23 +118,25 @@ func (i *Ingester) flushLoop(j int) {
}()

for {
o := i.flushQueues[j].Dequeue()
o := i.flushQueues.Dequeue(j)
if o == nil {
return
}
op := o.(*flushOp)

level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp")
level.Debug(util.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
}

if err != nil {
// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues[j].Enqueue(op)
i.flushQueues.Requeue(op)
continue
}

i.flushQueues.Clear(op)
}
}

Expand Down
13 changes: 8 additions & 5 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/validation"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
Expand Down Expand Up @@ -47,9 +48,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
store storage.Store

// One queue per flush thread.
flushQueues []*util.PriorityQueue
flushQueueIndex int
flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup

limiter *Limiter
Expand All @@ -63,12 +62,11 @@ func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingeste
cfg: cfg,
instances: map[string]*instance{},
store: store,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(metricFlushQueueLength)
go i.flushLoop(j)
}

Expand Down Expand Up @@ -136,6 +134,11 @@ func (i *Ingester) stopping(_ error) error {
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}

if i.flushQueues != nil {
i.flushQueues.Stop()
i.flushQueuesDone.Wait()
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type instance struct {
headBlock *tempodb_wal.AppendBlock
completingBlock *tempodb_wal.AppendBlock
completeBlocks []*tempodb_wal.CompleteBlock
lastBlockCut time.Time

lastBlockCut time.Time

instanceID string
tracesCreatedTotal prometheus.Counter
Expand Down
64 changes: 64 additions & 0 deletions pkg/flushqueues/exclusivequeues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flushqueues

import (
"sync"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber-go/atomic"
)

type ExclusiveQueues struct {
queues []*util.PriorityQueue
index *atomic.Int32
activeKeys sync.Map
}

// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
f := &ExclusiveQueues{
queues: make([]*util.PriorityQueue, queues),
index: atomic.NewInt32(0),
}

for j := 0; j < queues; j++ {
f.queues[j] = util.NewPriorityQueue(metric)
}

return f
}

// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {
_, ok := f.activeKeys.Load(op.Key())
if ok {
return
}

f.activeKeys.Store(op.Key(), struct{}{})
f.Requeue(op)
}

// Dequeue removes the next op from the requested queue. After dequeueing the calling
// process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {
return f.queues[q].Dequeue()
}

// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {
flushQueueIndex := int(f.index.Inc()) % len(f.queues)
f.queues[flushQueueIndex].Enqueue(op)
}

// ClearKey unblocks the requested key. This should be called only after a flush has been successful
Comment thread
joe-elliott marked this conversation as resolved.
Outdated
func (f *ExclusiveQueues) Clear(op util.Op) {
f.activeKeys.Delete(op.Key())
Comment thread
joe-elliott marked this conversation as resolved.
}

// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
for _, q := range f.queues {
q.Close()
}
}
106 changes: 106 additions & 0 deletions pkg/flushqueues/exclusivequeues_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package flushqueues

import (
"testing"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

type mockOp struct {
key string
}

func (m mockOp) Key() string {
return m.key
}

func (m mockOp) Priority() int64 {
return 0
}

func TestExclusiveQueues(t *testing.T) {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "testersons",
})

q := New(1, gauge)
op := mockOp{
key: "not unique",
}

// enqueue twice
q.Enqueue(op)
length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

q.Enqueue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

// dequeue -> requeue
_ = q.Dequeue(0)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.Requeue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

// dequeue -> clearkey -> enqueue
_ = q.Dequeue(0)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.Clear(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.Enqueue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))
}

func TestMultipleQueues(t *testing.T) {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "testersons",
})

totalQueues := 10
totalItems := 10
q := New(totalQueues, gauge)

// add stuff to the queue and confirm the length matches expected
for i := 0; i < totalItems; i++ {
op := mockOp{
key: uuid.New().String(),
}

q.Enqueue(op)

length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, i+1, int(length))
}

// each queue should have 1 thing
for i := 0; i < totalQueues; i++ {
op := q.Dequeue(i)
assert.NotNil(t, op)

length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, totalQueues-(i+1), int(length))
}
}
32 changes: 32 additions & 0 deletions pkg/util/test/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package test

import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func GetCounterValue(metric prometheus.Counter) (float64, error) {
var m = &dto.Metric{}
err := metric.Write(m)
if err != nil {
return 0, err
}
return m.Counter.GetValue(), nil
}

func GetGaugeValue(metric prometheus.Gauge) (float64, error) {
var m = &dto.Metric{}
err := metric.Write(m)
if err != nil {
return 0, err
}
return m.Gauge.GetValue(), nil
}

func GetCounterVecValue(metric *prometheus.CounterVec, label string) (float64, error) {
var m = &dto.Metric{}
if err := metric.WithLabelValues(label).Write(m); err != nil {
return 0, err
}
return m.Counter.GetValue(), nil
}
Loading