Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [CHANGE] jsonnet: Add emptyDir data volume to block-builder StatefulSet [#6648](https://github.com/grafana/tempo/pull/6648) (@mapno)
* [CHANGE] Add quick checks to tempo mixin runbook [#6696](https://github.com/grafana/tempo/pull/6696) (@javiermolinar)
* [CHANGE] Deprecate metrics-generator no-local-blocks [#6707](https://github.com/grafana/tempo/pull/6707) (@javiermolinar)
* [CHANGE] Own local block and partition ring helpers [#6808](https://github.com/grafana/tempo/pull/6808) (@javiermolinar)
* [CHANGE] Track invalid trace and span id discards [#6799](https://github.com/grafana/tempo/pull/6799) (@javiermolinar)
* [FEATURE] Add automemlimit support for automatic GOMEMLIMIT configuration. Enable with `memory.automemlimit_enabled: true`. [#6313](https://github.com/grafana/tempo/pull/6313) (@oleg-kozlyuk)
* [FEATURE] Support comparison operators in TraceQL Metrics queries [#6474](ghttps://github.com/grafana/tempo/pull/6474) (@ruslan-mikhailov)
Expand Down
7 changes: 3 additions & 4 deletions modules/livestore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/ring"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand All @@ -15,9 +14,9 @@ import (
const defaultCompleteBlockTimeout = time.Hour

type Config struct {
Ring ring.Config `yaml:"ring,omitempty"`
PartitionRing ingester.PartitionRingConfig `yaml:"partition_ring" category:"experimental"`
Metrics MetricsConfig `yaml:"metrics"`
Ring ring.Config `yaml:"ring,omitempty"`
PartitionRing PartitionRingConfig `yaml:"partition_ring"`
Comment thread
javiermolinar marked this conversation as resolved.
Metrics MetricsConfig `yaml:"metrics"`

// CommitInterval configures how often the partition reader commits to kafka
// 0s means synchronous commits
Expand Down
7 changes: 3 additions & 4 deletions modules/livestore/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/livetraces"
"github.com/grafana/tempo/pkg/model"
Expand Down Expand Up @@ -101,7 +100,7 @@ type instance struct {
blocksMtx sync.RWMutex
headBlock common.WALBlock
walBlocks map[uuid.UUID]common.WALBlock
completeBlocks map[uuid.UUID]*ingester.LocalBlock
completeBlocks map[uuid.UUID]*LocalBlock
lastCutTime time.Time

// Live traces
Expand All @@ -127,7 +126,7 @@ func newInstance(instanceID string, cfg Config, wal *wal.WAL, completeBlockEncod
wal: wal,
completeBlockEncoding: completeBlockEncoding,
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
completeBlocks: map[uuid.UUID]*LocalBlock{},
liveTraces: livetraces.New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) }, cfg.MaxTraceIdle, cfg.MaxTraceLive, instanceID),
traceSizes: tracesizes.New(),
maxTraceLogger: util_log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
Expand Down Expand Up @@ -545,7 +544,7 @@ func (i *instance) completeBlock(ctx context.Context, id uuid.UUID) error {
return nil
}

i.completeBlocks[id] = ingester.NewLocalBlock(ctx, newBlock, i.wal.LocalBackend())
i.completeBlocks[id] = NewLocalBlock(ctx, newBlock, i.wal.LocalBackend())

err = walBlock.Clear()
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions modules/livestore/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.uber.org/atomic"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/collector"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (i *instance) iterateBlocks(ctx context.Context, reqStart, reqEnd time.Time
}

wg.Add(1)
go func(block *ingester.LocalBlock) {
go func(block *LocalBlock) {
defer wg.Done()

if ctx.Err() != nil {
Expand Down Expand Up @@ -498,7 +497,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag

searchWithCache := func(ctx context.Context, _ *backend.BlockMeta, b block) error {
// if not a local block, fall back to regular search
localB, ok := b.(*ingester.LocalBlock)
localB, ok := b.(*LocalBlock)
if !ok {
return search(ctx, b)
}
Expand Down Expand Up @@ -712,7 +711,7 @@ func (i *instance) QueryRange(ctx context.Context, req *tempopb.QueryRangeReques
return nil
}

if localBlock, ok := b.(*ingester.LocalBlock); ok {
if localBlock, ok := b.(*LocalBlock); ok {
resp, err := i.queryRangeCompleteBlock(ctx, localBlock, *req, timeOverlapCutoff, unsafe)
if err != nil {
return err
Expand Down Expand Up @@ -772,7 +771,7 @@ func (i *instance) queryRangeWALBlock(ctx context.Context, b common.WALBlock, ev
return eval.Do(ctx, fetcher, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()), maxSeries)
}

func (i *instance) queryRangeCompleteBlock(ctx context.Context, b *ingester.LocalBlock, req tempopb.QueryRangeRequest, timeOverlapCutoff float64, unsafe bool) ([]*tempopb.TimeSeries, error) {
func (i *instance) queryRangeCompleteBlock(ctx context.Context, b *LocalBlock, req tempopb.QueryRangeRequest, timeOverlapCutoff float64, unsafe bool) ([]*tempopb.TimeSeries, error) {
m := b.BlockMeta()
ctx, span := tracer.Start(ctx, "instance.QueryRange.CompleteBlock", oteltrace.WithAttributes(
attribute.String("block", m.BlockID.String()),
Expand Down
3 changes: 1 addition & 2 deletions modules/livestore/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/ingest/testkafka"
"github.com/grafana/tempo/pkg/model/trace"
Expand Down Expand Up @@ -1165,7 +1164,7 @@ func TestLiveStoreQueryRange(t *testing.T) {

// Get the completed block for testing
inst.blocksMtx.RLock()
var block *ingester.LocalBlock
var block *LocalBlock
for _, b := range inst.completeBlocks {
block = b
break
Expand Down
3 changes: 1 addition & 2 deletions modules/livestore/live_store_background.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
Expand Down Expand Up @@ -337,7 +336,7 @@ func (s *LiveStore) reloadBlocks() error {

level.Info(s.logger).Log("msg", "reloaded complete block", "block", id.String())

lb := ingester.NewLocalBlock(ctx, blk, l)
lb := NewLocalBlock(ctx, blk, l)

inst, err := s.getOrCreateInstance(tenant)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions modules/livestore/live_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -25,6 +26,7 @@
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -215,6 +217,42 @@
requireInstanceState(t, liveStore.instances[testTenantID], instanceState{liveTraces: 0, walBlocks: 0, completeBlocks: 1})
}

func TestLiveStoreDropsInvalidCompleteBlocksOnRestart(t *testing.T) {
tmpDir := t.TempDir()

liveStore, err := defaultLiveStore(t, tmpDir)
require.NoError(t, err)

_, _ = pushToLiveStore(t, liveStore)
inst, err := liveStore.getOrCreateInstance(testTenantID)
require.NoError(t, err)

require.NoError(t, inst.cutIdleTraces(true))

Check failure on line 230 in modules/livestore/live_store_test.go

View workflow job for this annotation

GitHub Actions / Lint

not enough arguments in call to inst.cutIdleTraces
walUUID, err := inst.cutBlocks(true)

Check failure on line 231 in modules/livestore/live_store_test.go

View workflow job for this annotation

GitHub Actions / Lint

not enough arguments in call to inst.cutBlocks
Comment thread
javiermolinar marked this conversation as resolved.
Outdated
require.NoError(t, err)
require.NoError(t, inst.completeBlock(context.Background(), walUUID))

var blockID uuid.UUID
for id := range inst.completeBlocks {
blockID = id
break
}
Comment thread
javiermolinar marked this conversation as resolved.
require.NotEqual(t, uuid.Nil, blockID)

writer := backend.NewWriter(liveStore.wal.LocalBackend())
require.NoError(t, writer.Write(context.Background(), vparquet4.DataFileName, blockID, testTenantID, []byte("mangled"), nil))
Comment thread
javiermolinar marked this conversation as resolved.
Outdated

require.NoError(t, services.StopAndAwaitTerminated(t.Context(), liveStore))

liveStore, err = defaultLiveStore(t, tmpDir)
require.NoError(t, err)

inst, ok := liveStore.instances[testTenantID]
if ok {
require.Len(t, inst.completeBlocks, 0)
}
Comment thread
javiermolinar marked this conversation as resolved.
Outdated
}

func TestLiveStoreConsumeDropsOldRecords(t *testing.T) {
// default live store uses the default complete block timeout
ls, _ := defaultLiveStore(t, t.TempDir())
Expand Down
153 changes: 153 additions & 0 deletions modules/livestore/local_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package livestore

import (
"context"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"go.uber.org/atomic"
)

const nameFlushed = "flushed"

// LocalBlock is a block stored in local storage. It can be searched and flushed to a remote backend, and
// permanently tracks the flushed time with a special file in the block.
type LocalBlock struct {
common.BackendBlock
reader backend.Reader
writer backend.Writer

flushedTime atomic.Int64 // protecting flushedTime because it's accessed concurrently
}

var (
_ common.Finder = (*LocalBlock)(nil)
_ common.Searcher = (*LocalBlock)(nil)
)

// NewLocalBlock creates a local block wrapper around an existing backend block.
func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *local.Backend) *LocalBlock {
c := &LocalBlock{
BackendBlock: existingBlock,
reader: backend.NewReader(l),
writer: backend.NewWriter(l),
}

flushedBytes, err := c.reader.Read(ctx, nameFlushed, (uuid.UUID)(c.BlockMeta().BlockID), c.BlockMeta().TenantID, nil)
if err == nil {
flushedTime := time.Time{}
if err := flushedTime.UnmarshalText(flushedBytes); err == nil {
c.flushedTime.Store(flushedTime.Unix())
}
}

return c
}

func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.TraceByIDResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.FindTraceByID")
defer span.End()
return c.BackendBlock.FindTraceByID(ctx, id, opts)
}

func (c *LocalBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.Search")
defer span.End()
return c.BackendBlock.Search(ctx, req, opts)
}

func (c *LocalBlock) SearchTags(ctx context.Context, scope traceql.AttributeScope, cb common.TagsCallback, mcb common.MetricsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.SearchTags")
defer span.End()
return c.BackendBlock.SearchTags(ctx, scope, cb, mcb, opts)
}

func (c *LocalBlock) SearchTagValues(ctx context.Context, tag string, cb common.TagValuesCallback, mcb common.MetricsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.SearchTagValues")
defer span.End()
return c.BackendBlock.SearchTagValues(ctx, tag, cb, mcb, opts)
}

func (c *LocalBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagValuesCallbackV2, mcb common.MetricsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.SearchTagValuesV2")
defer span.End()
return c.BackendBlock.SearchTagValuesV2(ctx, tag, cb, mcb, opts)
}

func (c *LocalBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opts common.SearchOptions) (traceql.FetchSpansResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.Fetch")
defer span.End()
return c.BackendBlock.Fetch(ctx, req, opts)
}

func (c *LocalBlock) FetchSpans(ctx context.Context, req traceql.FetchSpansRequest, opts common.SearchOptions) (traceql.FetchSpansOnlyResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.FetchSpans")
defer span.End()
return c.BackendBlock.FetchSpans(ctx, req, opts)
}

func (c *LocalBlock) FetchTagValues(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback, mcb common.MetricsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.FetchTagValues")
defer span.End()
return c.BackendBlock.FetchTagValues(ctx, req, cb, mcb, opts)
}

func (c *LocalBlock) FetchTagNames(ctx context.Context, req traceql.FetchTagsRequest, cb traceql.FetchTagsCallback, mcb common.MetricsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.FetchTagNames")
defer span.End()
return c.BackendBlock.FetchTagNames(ctx, req, cb, mcb, opts)
}

// FlushedTime returns the time the block was flushed, or the zero time if it has never been flushed.
func (c *LocalBlock) FlushedTime() time.Time {
unixTime := c.flushedTime.Load()
if unixTime == 0 {
return time.Time{}
}
return time.Unix(unixTime, 0)
}

func (c *LocalBlock) SetFlushed(ctx context.Context) error {
flushedTime := time.Now()
flushedBytes, err := flushedTime.MarshalText()
if err != nil {
return fmt.Errorf("error marshalling flush time to text: %w", err)
}

err = c.writer.Write(ctx, nameFlushed, (uuid.UUID)(c.BlockMeta().BlockID), c.BlockMeta().TenantID, flushedBytes, nil)
if err != nil {
return fmt.Errorf("error writing local block flushed file: %w", err)
}

c.flushedTime.Store(flushedTime.Unix())
return nil
}

func (c *LocalBlock) Write(ctx context.Context, w backend.Writer) error {
if err := encoding.CopyBlock(ctx, c.BlockMeta(), c.reader, w); err != nil {
return fmt.Errorf("error copying block from local to remote backend: %w", err)
}

return c.SetFlushed(ctx)
}

func (c *LocalBlock) SetDiskCache(ctx context.Context, cacheKey string, data []byte) error {
return c.writer.Write(ctx, cacheKey, (uuid.UUID)(c.BlockMeta().BlockID), c.BlockMeta().TenantID, data, nil)
}

func (c *LocalBlock) GetDiskCache(ctx context.Context, cacheKey string) ([]byte, error) {
data, err := c.reader.Read(ctx, cacheKey, (uuid.UUID)(c.BlockMeta().BlockID), c.BlockMeta().TenantID, nil)
if errors.Is(err, backend.ErrDoesNotExist) {
return nil, nil
}

return data, err
}
Loading
Loading