diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c8b9f65171..7bf7661d02f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott) Fix an issue where the tempo-cli was not correctly dumping exemplar results. +* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio) # v2.7.0 diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 1ff9e445119..5afc82b7d0c 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -28,9 +28,29 @@ const ( blockBuilderServiceName = "block-builder" ConsumerGroup = "block-builder" pollTimeout = 2 * time.Second + cutTime = 10 * time.Second ) var ( + metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "fetch_duration_seconds", + Help: "Time spent fetching from Kafka.", + NativeHistogramBucketFactor: 1.1, + }, []string{"partition"}) + metricFetchBytesTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "fetch_bytes_total", + Help: "Total number of bytes fetched from Kafka", + }, []string{"partition"}) + metricFetchRecordsTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "fetch_records_total", + Help: "Total number of records fetched from Kafka", + }, []string{"partition"}) metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "tempo", Subsystem: "block_builder", @@ -180,6 +200,12 @@ func (b *BlockBuilder) consume(ctx context.Context) error { level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions) defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) + // Clear all previous remnants + err := b.wal.Clear() + if err != nil { + return err + } + for _, partition := range partitions { // Consume partition while data remains. // TODO - round-robin one consumption per partition instead to equalize catch-up time. @@ -207,10 +233,12 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov dur = b.cfg.ConsumeCycleDuration topic = b.cfg.IngestStorageConfig.Kafka.Topic group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup + partLabel = strconv.Itoa(int(partition)) startOffset kgo.Offset init bool writer *writer lastRec *kgo.Record + nextCut time.Time end time.Time ) @@ -247,6 +275,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov outer: for { fetches := func() kgo.Fetches { + defer func(t time.Time) { metricFetchDuration.WithLabelValues(partLabel).Observe(time.Since(t).Seconds()) }(time.Now()) ctx2, cancel := context.WithTimeout(ctx, pollTimeout) defer cancel() return b.kafkaClient.PollFetches(ctx2) @@ -257,7 +286,7 @@ outer: // No more data break } - metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc() + metricFetchErrors.WithLabelValues(partLabel).Inc() return false, err } @@ -267,19 +296,23 @@ outer: for iter := fetches.RecordIter(); !iter.Done(); { rec := iter.Next() + metricFetchBytesTotal.WithLabelValues(partLabel).Add(float64(len(rec.Value))) + metricFetchRecordsTotal.WithLabelValues(partLabel).Inc() level.Debug(b.logger).Log( "msg", "processing record", "partition", rec.Partition, "offset", rec.Offset, "timestamp", rec.Timestamp, + "len", len(rec.Value), ) // Initialize on first record if !init { end = rec.Timestamp.Add(dur) // When block will be cut - metricPartitionLagSeconds.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds()) + metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds()) writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + nextCut = rec.Timestamp.Add(cutTime) init = true } @@ -295,7 +328,16 @@ outer: break outer } - err := b.pushTraces(rec.Key, rec.Value, writer) + if rec.Timestamp.After(nextCut) { + // Cut before appending this trace + err = writer.cutidle(rec.Timestamp.Add(-cutTime), false) + if err != nil { + return false, err + } + nextCut = rec.Timestamp.Add(cutTime) + } + + err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer) if err != nil { return false, err } @@ -313,6 +355,12 @@ outer: return false, nil } + // Cut any remaining + err = writer.cutidle(time.Time{}, true) + if err != nil { + return false, err + } + err = writer.flush(ctx, b.writer) if err != nil { return false, err @@ -370,14 +418,14 @@ func (b *BlockBuilder) stopping(err error) error { return err } -func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error { +func (b *BlockBuilder) pushTraces(ts time.Time, tenantBytes, reqBytes []byte, p partitionSectionWriter) error { req, err := b.decoder.Decode(reqBytes) if err != nil { return fmt.Errorf("failed to decode trace: %w", err) } defer b.decoder.Reset() - return p.pushBytes(string(tenantBytes), req) + return p.pushBytes(ts, string(tenantBytes), req) } func (b *BlockBuilder) getAssignedActivePartitions() []int32 { diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index 0e99eada347..3c11a3b6208 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -302,7 +302,7 @@ func TestBlockbuilder_committingFails(t *testing.T) { requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } -func blockbuilderConfig(t *testing.T, address string) Config { +func blockbuilderConfig(t testing.TB, address string) Config { cfg := Config{} flagext.DefaultValues(&cfg) @@ -327,7 +327,7 @@ type ownEverythingSharder struct{} func (o *ownEverythingSharder) Owns(string) bool { return true } -func newStore(ctx context.Context, t *testing.T) storage.Store { +func newStore(ctx context.Context, t testing.TB) storage.Store { tmpDir := t.TempDir() s, err := storage.NewStore(storage.Config{ Trace: tempodb.Config{ @@ -402,9 +402,10 @@ type mockOverrides struct { dc backend.DedicatedColumns } +func (m *mockOverrides) MaxBytesPerTrace(_ string) int { return 0 } func (m *mockOverrides) DedicatedColumns(_ string) backend.DedicatedColumns { return m.dc } -func newKafkaClient(t *testing.T, config ingest.KafkaConfig) *kgo.Client { +func newKafkaClient(t testing.TB, config ingest.KafkaConfig) *kgo.Client { writeClient, err := kgo.NewClient( kgo.SeedBrokers(config.Address), kgo.AllowAutoTopicCreation(), @@ -427,7 +428,7 @@ func countFlushedTraces(store storage.Store) int { } // nolint: revive -func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record { +func sendReq(t testing.TB, ctx context.Context, client *kgo.Client) []*kgo.Record { traceID := generateTraceID(t) req := test.MakePushBytesRequest(t, 10, traceID) @@ -463,7 +464,7 @@ func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, i } } -func generateTraceID(t *testing.T) []byte { +func generateTraceID(t testing.TB) []byte { traceID := make([]byte, 16) _, err := rand.Read(traceID) require.NoError(t, err) @@ -478,3 +479,44 @@ func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Clie require.True(t, ok) require.Equal(t, expectedOffset, offset.At) } + +func BenchmarkBlockBuilder(b *testing.B) { + var ( + ctx = context.Background() + _, address = testkafka.CreateCluster(b, 1, testTopic) + store = newStore(ctx, b) + cfg = blockbuilderConfig(b, address) + client = newKafkaClient(b, cfg.IngestStorageConfig.Kafka) + ) + + cfg.ConsumeCycleDuration = 1 * time.Hour + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + + var records []*kgo.Record + + for i := 0; i < 1000; i++ { + records = append(records, sendReq(b, ctx, client)...) + } + + var size int + for _, r := range records { + size += len(r.Value) + } + + b.ResetTimer() + + bb := New(cfg, test.NewTestingLogger(b), newPartitionRingReader(), &mockOverrides{}, store) + + // Startup (without starting the background consume cycle) + err := bb.starting(ctx) + require.NoError(b, err) + + err = bb.consume(ctx) + require.NoError(b, err) + + b.SetBytes(int64(size)) + } +} diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index 445c6de8a29..3937a2d1ce8 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -18,7 +18,7 @@ import ( ) type partitionSectionWriter interface { - pushBytes(tenant string, req *tempopb.PushBytesRequest) error + pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRequest) error flush(ctx context.Context, store tempodb.Writer) error } @@ -50,7 +50,7 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, } } -func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { +func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRequest) error { level.Debug(p.logger).Log( "msg", "pushing bytes", "tenant", tenant, @@ -69,28 +69,20 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { return fmt.Errorf("failed to unmarshal trace: %w", err) } - var start, end uint64 - for _, b := range tr.ResourceSpans { - for _, ss := range b.ScopeSpans { - for _, s := range ss.Spans { - if start == 0 || s.StartTimeUnixNano < start { - start = s.StartTimeUnixNano - } - if s.EndTimeUnixNano > end { - end = s.EndTimeUnixNano - } - } - } + if err := i.AppendTrace(req.Ids[j], tr, ts); err != nil { + return err } + } - startSeconds := uint32(start / uint64(time.Second)) - endSeconds := uint32(end / uint64(time.Second)) + return nil +} - if err := i.AppendTrace(req.Ids[j], tr, startSeconds, endSeconds); err != nil { +func (p *writer) cutidle(since time.Time, immediate bool) error { + for _, i := range p.m { + if err := i.CutIdle(since, immediate); err != nil { return err } } - return nil } diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index c0057f634ab..d53428c4a25 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -1,15 +1,21 @@ package blockbuilder import ( + "bytes" "context" + "slices" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/tempo/modules/blockbuilder/util" + "github.com/grafana/tempo/modules/overrides" + "github.com/grafana/tempo/pkg/livetraces" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/tracesizes" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" @@ -27,6 +33,8 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( }, []string{"tenant"}, ) +const reasonTraceTooLarge = "trace_too_large" + // TODO - This needs locking type tenantStore struct { tenantID string @@ -44,6 +52,9 @@ type tenantStore struct { blocksMtx sync.Mutex walBlocks []common.WALBlock + + liveTraces *livetraces.LiveTraces + traceSizes *tracesizes.Tracker } func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { @@ -57,6 +68,8 @@ func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg Block headBlockMtx: sync.Mutex{}, blocksMtx: sync.Mutex{}, enc: enc, + liveTraces: livetraces.New(), + traceSizes: tracesizes.New(), } return s, s.resetHeadBlock() @@ -104,13 +117,68 @@ func (s *tenantStore) resetHeadBlock() error { return nil } -func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error { - // TODO - Do this async, it slows down consumption - if err := s.cutHeadBlock(false); err != nil { +func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, ts time.Time) error { + maxSz := s.overrides.MaxBytesPerTrace(s.tenantID) + + for _, b := range tr.ResourceSpans { + if maxSz > 0 && !s.traceSizes.Allow(traceID, b.Size(), maxSz) { + // Record dropped spans due to trace too large + count := 0 + for _, ss := range b.ScopeSpans { + count += len(ss.Spans) + } + overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID) + continue + } + + s.liveTraces.PushWithTimestamp(ts, traceID, b, 0) + } + return nil +} + +func (s *tenantStore) CutIdle(since time.Time, immediate bool) error { + idle := s.liveTraces.CutIdle(since, immediate) + + slices.SortFunc(idle, func(a, b *livetraces.LiveTrace) int { + return bytes.Compare(a.ID, b.ID) + }) + + for _, e := range idle { + tr := &tempopb.Trace{ + ResourceSpans: e.Batches, + } + + // Get trace timestamp bounds + var start, end uint64 + for _, b := range tr.ResourceSpans { + for _, ss := range b.ScopeSpans { + for _, s := range ss.Spans { + if start == 0 || s.StartTimeUnixNano < start { + start = s.StartTimeUnixNano + } + if s.EndTimeUnixNano > end { + end = s.EndTimeUnixNano + } + } + } + } + + // Convert from unix nanos to unix seconds + startSeconds := uint32(start / uint64(time.Second)) + endSeconds := uint32(end / uint64(time.Second)) + + if err := s.headBlock.AppendTrace(e.ID, tr, startSeconds, endSeconds); err != nil { + return err + } + } + + err := s.headBlock.Flush() + if err != nil { return err } - return s.headBlock.AppendTrace(traceID, tr, start, end) + // Cut head block if needed + return s.cutHeadBlock(false) } func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { @@ -131,6 +199,12 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { if err != nil { return err } + + err = block.Clear() + if err != nil { + return err + } + completeBlocks = append(completeBlocks, completeBlock) } @@ -142,14 +216,13 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { return err } metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc() - } - // Clear the blocks - for _, block := range s.walBlocks { if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil { return err } } + + // Clear the blocks s.walBlocks = s.walBlocks[:0] return nil diff --git a/modules/blockbuilder/writeable_block.go b/modules/blockbuilder/writeable_block.go index ef028ab02f3..361d163fcb2 100644 --- a/modules/blockbuilder/writeable_block.go +++ b/modules/blockbuilder/writeable_block.go @@ -15,6 +15,7 @@ import ( // Overrides is just the set of overrides needed here. type Overrides interface { + MaxBytesPerTrace(string) int DedicatedColumns(string) backend.DedicatedColumns } diff --git a/modules/generator/processor/localblocks/livetraces.go b/modules/generator/processor/localblocks/livetraces.go deleted file mode 100644 index 9d0ec8231b1..00000000000 --- a/modules/generator/processor/localblocks/livetraces.go +++ /dev/null @@ -1,86 +0,0 @@ -package localblocks - -import ( - "hash" - "hash/fnv" - "time" - - v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" -) - -type liveTrace struct { - id []byte - timestamp time.Time - Batches []*v1.ResourceSpans - - sz uint64 -} - -type liveTraces struct { - hash hash.Hash64 - traces map[uint64]*liveTrace - - sz uint64 -} - -func newLiveTraces() *liveTraces { - return &liveTraces{ - hash: fnv.New64(), - traces: make(map[uint64]*liveTrace), - } -} - -func (l *liveTraces) token(traceID []byte) uint64 { - l.hash.Reset() - l.hash.Write(traceID) - return l.hash.Sum64() -} - -func (l *liveTraces) Len() uint64 { - return uint64(len(l.traces)) -} - -func (l *liveTraces) Size() uint64 { - return l.sz -} - -func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool { - token := l.token(traceID) - - tr := l.traces[token] - if tr == nil { - - // Before adding this check against max - // Zero means no limit - if max > 0 && uint64(len(l.traces)) >= max { - return false - } - - tr = &liveTrace{ - id: traceID, - } - l.traces[token] = tr - } - - sz := uint64(batch.Size()) - tr.sz += sz - l.sz += sz - - tr.Batches = append(tr.Batches, batch) - tr.timestamp = time.Now() - return true -} - -func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace { - res := []*liveTrace{} - - for k, tr := range l.traces { - if tr.timestamp.Before(idleSince) || immediate { - res = append(res, tr) - l.sz -= tr.sz - delete(l.traces, k) - } - } - - return res -} diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 0454cd08819..f17bf017ea6 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/otel" gen "github.com/grafana/tempo/modules/generator/processor" + "github.com/grafana/tempo/pkg/livetraces" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" @@ -70,7 +71,7 @@ type Processor struct { flushqueue *flushqueues.PriorityQueue liveTracesMtx sync.Mutex - liveTraces *liveTraces + liveTraces *livetraces.LiveTraces traceSizes *tracesizes.Tracker writer tempodb.Writer @@ -103,7 +104,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid walBlocks: map[uuid.UUID]common.WALBlock{}, completeBlocks: map[uuid.UUID]*ingester.LocalBlock{}, flushqueue: flushqueues.NewPriorityQueue(metricFlushQueueSize.WithLabelValues(tenant)), - liveTraces: newLiveTraces(), + liveTraces: livetraces.New(), traceSizes: tracesizes.New(), closeCh: make(chan struct{}), wg: sync.WaitGroup{}, @@ -597,7 +598,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error { p.liveTracesMtx.Lock() // Record live traces before flushing so we know the high water mark - metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces))) + metricLiveTraces.WithLabelValues(p.tenant).Set(float64(p.liveTraces.Len())) metricLiveTraceBytes.WithLabelValues(p.tenant).Set(float64(p.liveTraces.Size())) since := time.Now().Add(-p.Cfg.TraceIdlePeriod) @@ -611,7 +612,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error { // Sort by ID sort.Slice(tracesToCut, func(i, j int) bool { - return bytes.Compare(tracesToCut[i].id, tracesToCut[j].id) == -1 + return bytes.Compare(tracesToCut[i].ID, tracesToCut[j].ID) == -1 }) for _, t := range tracesToCut { @@ -620,7 +621,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error { ResourceSpans: t.Batches, } - err := p.writeHeadBlock(t.id, tr) + err := p.writeHeadBlock(t.ID, tr) if err != nil { return err } diff --git a/pkg/livetraces/livetraces.go b/pkg/livetraces/livetraces.go new file mode 100644 index 00000000000..7e5e1b9c173 --- /dev/null +++ b/pkg/livetraces/livetraces.go @@ -0,0 +1,90 @@ +package livetraces + +import ( + "hash" + "hash/fnv" + "time" + + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +type LiveTrace struct { + ID []byte + timestamp time.Time + Batches []*v1.ResourceSpans + + sz uint64 +} + +type LiveTraces struct { + hash hash.Hash64 + Traces map[uint64]*LiveTrace + + sz uint64 +} + +func New() *LiveTraces { + return &LiveTraces{ + hash: fnv.New64(), + Traces: make(map[uint64]*LiveTrace), + } +} + +func (l *LiveTraces) token(traceID []byte) uint64 { + l.hash.Reset() + l.hash.Write(traceID) + return l.hash.Sum64() +} + +func (l *LiveTraces) Len() uint64 { + return uint64(len(l.Traces)) +} + +func (l *LiveTraces) Size() uint64 { + return l.sz +} + +func (l *LiveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool { + return l.PushWithTimestamp(time.Now(), traceID, batch, max) +} + +func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.ResourceSpans, max uint64) bool { + token := l.token(traceID) + + tr := l.Traces[token] + if tr == nil { + + // Before adding this check against max + // Zero means no limit + if max > 0 && uint64(len(l.Traces)) >= max { + return false + } + + tr = &LiveTrace{ + ID: traceID, + } + l.Traces[token] = tr + } + + sz := uint64(batch.Size()) + tr.sz += sz + l.sz += sz + + tr.Batches = append(tr.Batches, batch) + tr.timestamp = ts + return true +} + +func (l *LiveTraces) CutIdle(idleSince time.Time, immediate bool) []*LiveTrace { + res := []*LiveTrace{} + + for k, tr := range l.Traces { + if tr.timestamp.Before(idleSince) || immediate { + res = append(res, tr) + l.sz -= tr.sz + delete(l.Traces, k) + } + } + + return res +} diff --git a/modules/generator/processor/localblocks/livetraces_test.go b/pkg/livetraces/livetraces_test.go similarity index 95% rename from modules/generator/processor/localblocks/livetraces_test.go rename to pkg/livetraces/livetraces_test.go index 60a1f3be12a..7422426c85c 100644 --- a/modules/generator/processor/localblocks/livetraces_test.go +++ b/pkg/livetraces/livetraces_test.go @@ -1,4 +1,4 @@ -package localblocks +package livetraces import ( "math/rand/v2" @@ -10,7 +10,7 @@ import ( ) func TestLiveTracesSizesAndLen(t *testing.T) { - lt := newLiveTraces() + lt := New() expectedSz := uint64(0) expectedLen := uint64(0) diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 7e38e194342..b3b2a45d153 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -366,7 +366,7 @@ func MakeTraceWithTags(traceID []byte, service string, intValue int64) *tempopb. return trace } -func MakePushBytesRequest(t *testing.T, requests int, traceID []byte) *tempopb.PushBytesRequest { +func MakePushBytesRequest(t testing.TB, requests int, traceID []byte) *tempopb.PushBytesRequest { trace := MakeTrace(requests, traceID) b, err := proto.Marshal(trace) require.NoError(t, err) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 2a6edd0de83..880e9a20b9d 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -153,9 +153,8 @@ func (w *WAL) GetFilepath() string { return w.c.Filepath } -func (w *WAL) ClearFolder(dir string) error { - p := filepath.Join(w.c.Filepath, dir) - return os.RemoveAll(p) +func (w *WAL) Clear() error { + return os.RemoveAll(w.c.Filepath) } func (w *WAL) LocalBackend() *local.Backend {