forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexclusivequeues.go
More file actions
64 lines (52 loc) · 1.58 KB
/
exclusivequeues.go
File metadata and controls
64 lines (52 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package flushqueues
import (
"sync"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber-go/atomic"
)
type ExclusiveQueues struct {
queues []*util.PriorityQueue
index *atomic.Int32
activeKeys sync.Map
}
// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
f := &ExclusiveQueues{
queues: make([]*util.PriorityQueue, queues),
index: atomic.NewInt32(0),
}
for j := 0; j < queues; j++ {
f.queues[j] = util.NewPriorityQueue(metric)
}
return f
}
// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {
_, ok := f.activeKeys.Load(op.Key())
if ok {
return
}
f.activeKeys.Store(op.Key(), struct{}{})
f.Requeue(op)
}
// Dequeue removes the next op from the requested queue. After dequeueing the calling
// process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {
return f.queues[q].Dequeue()
}
// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {
flushQueueIndex := int(f.index.Inc()) % len(f.queues)
f.queues[flushQueueIndex].Enqueue(op)
}
// Clear unblocks the requested op. This should be called only after a flush has been successful
func (f *ExclusiveQueues) Clear(op util.Op) {
f.activeKeys.Delete(op.Key())
}
// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
for _, q := range f.queues {
q.Close()
}
}