Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration/microservices/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
scale: 4 # every container = 1000 spans/s
volumes:
- ./load-generator.json:/etc/load-generator.json
environment:
Expand Down
134 changes: 107 additions & 27 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package ingester
import (
"context"
"fmt"
"math"
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/util/services"
"github.com/grafana/tempo/tempodb/wal"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -32,10 +36,15 @@ var (
})
)

type opKind int

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second

complete opKind = iota
Comment thread
annanay25 marked this conversation as resolved.
Outdated
flush opKind = iota
)

// Flush triggers a flush of all in memory traces to disk. This is called
Expand All @@ -52,16 +61,42 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
// stop accepting new writes
err := i.markUnavailable()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("error marking ingester unavailable"))
}

// move all data into flushQueue
i.sweepAllInstances(true)

// lifecycler should exit the ring on shutdown
i.lifecycler.SetUnregisterOnShutdown(true)

// stop ingester which will internally stop lifecycler
_ = services.StopAndAwaitTerminated(context.Background(), i)

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ingester successfully shutdown"))
}

// FlushHandler calls sweepAllInstances(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
i.sweepAllInstances(true)
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
from int64
userID string
kind opKind
from int64
userID string
completingBlock *wal.AppendBlock
}

func (o *flushOp) Key() string {
Expand All @@ -72,8 +107,8 @@ func (o *flushOp) Priority() int64 {
return -o.from
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
instances := i.getInstances()

for _, instance := range instances {
Expand All @@ -89,24 +124,44 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
return
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
// see if it's ready to cut a block
completingBlock, err := instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut block", "err", err)
return
}

// enqueue completingBlock if not nil
if completingBlock != nil {
instance.waitForFlush.Inc()
i.flushQueues.Enqueue(&flushOp{
kind: complete,
completingBlock: completingBlock,
from: math.MaxInt64,
userID: instance.instanceID,
})
}

// dump any blocks that have been flushed for awhile
err = instance.ClearFlushedBlocks(i.cfg.CompleteBlockTimeout)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// need a way to check that all completingBlocks have been flushed...
if immediate {
for instance.waitForFlush.Load() != 0 {
time.Sleep(100 * time.Millisecond)
}
}

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
// these might get double queued if a routine flush coincides with a shutdown .. but that's OK.
for range instance.GetBlocksToBeFlushed() {
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
kind: flush,
from: time.Now().Unix(),
userID: instance.instanceID,
})
}
}
Expand All @@ -124,16 +179,46 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
if op.kind == complete {
level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID, "fp")
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, clear and continue
_ = op.completingBlock.Clear()
continue
}

completeBlock, err := instance.writer.CompleteBlock(op.completingBlock, instance)

instance.blocksMtx.Lock()
if err != nil {
// this is a really bad error that results in data loss. most likely due to disk full
Comment thread
annanay25 marked this conversation as resolved.
Outdated
_ = op.completingBlock.Clear()
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", op.userID, "err", err)
instance.blocksMtx.Unlock()
continue
}
instance.completeBlocks = append(instance.completeBlocks, completeBlock)
instance.blocksMtx.Unlock()
Comment thread
annanay25 marked this conversation as resolved.
Outdated
} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
Comment thread
annanay25 marked this conversation as resolved.
Outdated
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

Comment thread
annanay25 marked this conversation as resolved.
Outdated
// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
}

instance, exists := i.getInstanceByID(op.userID)
if !exists {
continue
}
instance.waitForFlush.Dec()
}

i.flushQueues.Clear(op)
Expand All @@ -150,12 +235,7 @@ func (i *Ingester) flushUserTraces(userID string) error {
return fmt.Errorf("instance id %s not found", userID)
}

for {
block := instance.GetBlockToBeFlushed()
if block == nil {
break
}

for _, block := range instance.GetBlocksToBeFlushed() {
Comment thread
annanay25 marked this conversation as resolved.
Outdated
ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
Expand Down
24 changes: 16 additions & 8 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (i *Ingester) loop(ctx context.Context) error {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false)
i.sweepAllInstances(false)

case <-ctx.Done():
return nil
Expand All @@ -125,13 +125,9 @@ func (i *Ingester) loop(ctx context.Context) error {

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
// This will prevent us accepting any more samples
i.stopIncomingRequests()

// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
err := i.markUnavailable()
if err != nil {
return fmt.Errorf("error stopping ingester: %w", err)
}

if i.flushQueues != nil {
Expand All @@ -142,6 +138,18 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

func (i *Ingester) markUnavailable() error {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
return nil
}

// Push implements tempopb.Pusher.Push
func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
Expand Down
46 changes: 18 additions & 28 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/uber-go/atomic"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -59,6 +61,8 @@ type instance struct {
completingBlock *wal.AppendBlock
completeBlocks []*encoding.CompleteBlock

waitForFlush *atomic.Int32

lastBlockCut time.Time

instanceID string
Expand Down Expand Up @@ -133,59 +137,45 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
// CutBlockIfReady cuts a completingBlock from the HeadBlock if ready
func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) (*wal.AppendBlock, error) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if i.headBlock == nil || i.headBlock.DataLength() == 0 {
return nil
return nil, nil
}

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
completingBlock := i.headBlock

// make completingBlock searchable
i.completingBlock = completingBlock

i.completingBlock = i.headBlock
err := i.resetHeadBlock()
if err != nil {
return fmt.Errorf("failed to resetHeadBlock: %w", err)
return nil, fmt.Errorf("failed to resetHeadBlock: %w", err)
}

// todo : this should be a queue of blocks to complete with workers
go func() {
completeBlock, err := i.writer.CompleteBlock(i.completingBlock, i)
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if err != nil {
// this is a really bad error that results in data loss. most likely due to disk full
_ = i.completingBlock.Clear()
metricFailedFlushes.Inc()
i.completingBlock = nil
level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", i.instanceID, "err", err)
return
}
i.completingBlock = nil
i.completeBlocks = append(i.completeBlocks, completeBlock)
}()
return completingBlock, nil
}

return nil
return nil, nil
}

func (i *instance) GetBlockToBeFlushed() *encoding.CompleteBlock {
func (i *instance) GetBlocksToBeFlushed() []*encoding.CompleteBlock {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

completeBlockList := []*encoding.CompleteBlock{}
for _, c := range i.completeBlocks {
if c.FlushedTime().IsZero() {
return c
completeBlockList = append(completeBlockList, c)
}
}

return nil
return completeBlockList
}

func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error {
Expand Down