Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno)
* [BUGFIX] Fixes min/max time on blocks to be based on span times instead of ingestion time. [#1314](https://github.com/grafana/tempo/pull/1314) (@joe-elliott)

## v1.3.2 / 2022-02-23
* [BUGFIX] Fixed an issue where the query-frontend would corrupt start/end time ranges on searches which included the ingesters [#1295] (@joe-elliott)
Expand Down
1 change: 0 additions & 1 deletion integration/e2e/config-scalable-single-binary.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ingester:
heartbeat_period: 100ms
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
trace_idle_period: 100ms

Expand Down
18 changes: 17 additions & 1 deletion modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/tempo/pkg/flushqueues"
_ "github.com/grafana/tempo/pkg/gogocodec" // force gogo codec registration
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/model/decoder"
v1 "github.com/grafana/tempo/pkg/model/v1"
v2 "github.com/grafana/tempo/pkg/model/v2"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -326,7 +327,22 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
func (i *Ingester) replayWal() error {
level.Info(log.Logger).Log("msg", "beginning wal replay")

blocks, err := i.store.WAL().RescanBlocks(log.Logger)
blocks, err := i.store.WAL().RescanBlocks(func(b []byte, dataEncoding string) (uint32, uint32, error) {
d, err := model.NewObjectDecoder(dataEncoding)
if err != nil {
return 0, 0, nil
}

start, end, err := d.FastRange(b)
if err == decoder.ErrUnsupported {
now := uint32(time.Now().Unix())
return now, now, nil
}
if err != nil {
return 0, 0, err
}
return start, end, nil
}, log.Logger)
if err != nil {
return fmt.Errorf("fatal error replaying wal %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,18 @@ func (i *instance) measureReceivedBytes(traceBytes []byte, searchData []byte) {
// Moves any complete traces out of the map to complete traces
func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error {
tracesToCut := i.tracesToCut(cutoff, immediate)
batchDecoder := model.MustNewSegmentDecoder(model.CurrentEncoding)
segmentDecoder := model.MustNewSegmentDecoder(model.CurrentEncoding)

for _, t := range tracesToCut {
// sort batches before cutting to reduce combinations during compaction
sortByteSlices(t.batches)

out, err := batchDecoder.ToObject(t.batches)
out, err := segmentDecoder.ToObject(t.batches)
if err != nil {
return err
}

err = i.writeTraceToHeadBlock(t.traceID, out, t.searchData)
err = i.writeTraceToHeadBlock(t.traceID, out, t.searchData, t.start, t.end)
if err != nil {
return err
}
Expand Down Expand Up @@ -555,11 +555,11 @@ func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*liveTrac
return tracesToCut
}

func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][]byte) error {
func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][]byte, start, end uint32) error {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

err := i.headBlock.Append(id, b)
err := i.headBlock.Append(id, b, start, end)
if err != nil {
return err
}
Expand Down
24 changes: 21 additions & 3 deletions modules/ingester/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ingester

import (
"context"
"fmt"
"time"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/util/log"
)

Expand All @@ -20,9 +22,14 @@ var (
)

type liveTrace struct {
batches [][]byte
lastAppend time.Time
traceID []byte
batches [][]byte
lastAppend time.Time
traceID []byte
start uint32
end uint32
decoder model.SegmentDecoder

// byte limits
maxBytes int
currentBytes int

Expand All @@ -39,6 +46,7 @@ func newTrace(traceID []byte, maxBytes int, maxSearchBytes int) *liveTrace {
traceID: traceID,
maxBytes: maxBytes,
maxSearchBytes: maxSearchBytes,
decoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
}
}

Expand All @@ -53,7 +61,17 @@ func (t *liveTrace) Push(_ context.Context, instanceID string, trace []byte, sea
t.currentBytes += reqSize
}

start, end, err := t.decoder.FastRange(trace)
if err != nil {
return fmt.Errorf("failed to get range while adding segment: %w", err)
}
t.batches = append(t.batches, trace)
if t.start == 0 || start < t.start {
t.start = start
}
if t.end == 0 || end > t.end {
t.end = end
}

if searchDataSize := len(searchData); searchDataSize > 0 {
// disable limit when set to 0
Expand Down
43 changes: 40 additions & 3 deletions modules/ingester/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import (
"context"
"testing"

"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
prom_dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTraceMaxSearchBytes(t *testing.T) {
tenantID := "fake"
maxSearchBytes := 100
tr := newTrace(nil, 0, maxSearchBytes)
fakeTrace := make([]byte, 64)

getMetric := func() float64 {
m := &prom_dto.Metric{}
Expand All @@ -20,17 +24,50 @@ func TestTraceMaxSearchBytes(t *testing.T) {
return m.Counter.GetValue()
}

err := tr.Push(context.TODO(), tenantID, nil, make([]byte, maxSearchBytes))
err := tr.Push(context.TODO(), tenantID, fakeTrace, make([]byte, maxSearchBytes))
require.NoError(t, err)
require.Equal(t, float64(0), getMetric())

tooMany := 123

err = tr.Push(context.TODO(), tenantID, nil, make([]byte, tooMany))
err = tr.Push(context.TODO(), tenantID, fakeTrace, make([]byte, tooMany))
require.NoError(t, err)
require.Equal(t, float64(tooMany), getMetric())

err = tr.Push(context.TODO(), tenantID, nil, make([]byte, tooMany))
err = tr.Push(context.TODO(), tenantID, fakeTrace, make([]byte, tooMany))
require.NoError(t, err)
require.Equal(t, float64(tooMany*2), getMetric())
}

func TestTraceStartEndTime(t *testing.T) {
s := model.MustNewSegmentDecoder(model.CurrentEncoding)

tr := newTrace(nil, 0, 0)

// initial push
buff, err := s.PrepareForWrite(&tempopb.Trace{}, 10, 20)
require.NoError(t, err)
err = tr.Push(context.Background(), "test", buff, nil)
require.NoError(t, err)

assert.Equal(t, uint32(10), tr.start)
assert.Equal(t, uint32(20), tr.end)

// overwrite start
buff, err = s.PrepareForWrite(&tempopb.Trace{}, 5, 15)
require.NoError(t, err)
err = tr.Push(context.Background(), "test", buff, nil)
require.NoError(t, err)

assert.Equal(t, uint32(5), tr.start)
assert.Equal(t, uint32(20), tr.end)

// overwrite end
buff, err = s.PrepareForWrite(&tempopb.Trace{}, 15, 25)
require.NoError(t, err)
err = tr.Push(context.Background(), "test", buff, nil)
require.NoError(t, err)

assert.Equal(t, uint32(5), tr.start)
assert.Equal(t, uint32(25), tr.end)
}
3 changes: 3 additions & 0 deletions pkg/model/segment_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type SegmentDecoder interface {
// The resultant byte slice can then be manipulated using the corresponding ObjectDecoder.
// ToObject is on the write path and should do as little as possible.
ToObject(segments [][]byte) ([]byte, error)
// FastRange returns the start and end unix epoch timestamp of the provided segment. If its not possible to efficiently get these
// values from the underlying encoding then it should return decoder.ErrUnsupported
FastRange(segment []byte) (uint32, uint32, error)
}

// NewSegmentDecoder returns a Decoder given the passed string.
Expand Down
28 changes: 28 additions & 0 deletions pkg/model/segment_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,31 @@ func TestSegmentDecoderToObjectDecoderRange(t *testing.T) {
})
}
}

func TestSegmentDecoderFastRange(t *testing.T) {
for _, e := range AllEncodings {
t.Run(e, func(t *testing.T) {
start := rand.Uint32()
end := rand.Uint32()

segmentDecoder, err := NewSegmentDecoder(e)
require.NoError(t, err)

// random trace
trace := test.MakeTrace(100, nil)

segment, err := segmentDecoder.PrepareForWrite(trace, start, end)
require.NoError(t, err)

// test range
actualStart, actualEnd, err := segmentDecoder.FastRange(segment)
if err == decoder.ErrUnsupported {
return
}

require.NoError(t, err)
require.Equal(t, start, actualStart)
require.Equal(t, end, actualEnd)
})
}
}
5 changes: 5 additions & 0 deletions pkg/model/v1/segment_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/model/decoder"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -48,3 +49,7 @@ func (d *SegmentDecoder) ToObject(segments [][]byte) ([]byte, error) {
}
return proto.Marshal(wrapper)
}

func (d *SegmentDecoder) FastRange([]byte) (uint32, uint32, error) {
return 0, 0, decoder.ErrUnsupported
}
5 changes: 5 additions & 0 deletions pkg/model/v2/segment_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (d *SegmentDecoder) ToObject(segments [][]byte) ([]byte, error) {
}, minStart, maxEnd)
}

func (d *SegmentDecoder) FastRange(buff []byte) (uint32, uint32, error) {
_, start, end, err := stripStartEnd(buff)
return start, end, err
}

func marshalWithStartEnd(pb proto.Message, start uint32, end uint32) ([]byte, error) {
const uint32Size = 4

Expand Down
18 changes: 12 additions & 6 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,35 @@ type BlockMeta struct {
}

func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta {
now := time.Now()
b := &BlockMeta{
Version: version,
BlockID: blockID,
MinID: []byte{},
MaxID: []byte{},
TenantID: tenantID,
StartTime: now,
EndTime: now,
Encoding: encoding,
DataEncoding: dataEncoding,
}

return b
}

func (b *BlockMeta) ObjectAdded(id []byte) {
b.EndTime = time.Now()
// ObjectAdded updates the block meta appropriately based on information about an added record
// start/end are unix epoch seconds
func (b *BlockMeta) ObjectAdded(id []byte, start uint32, end uint32) {
startTime := time.Unix(int64(start), 0)
endTime := time.Unix(int64(end), 0)

if b.StartTime.IsZero() || startTime.Before(b.StartTime) {
b.StartTime = startTime
}
if b.EndTime.IsZero() || endTime.After(b.EndTime) {
b.EndTime = endTime
}

if len(b.MinID) == 0 || bytes.Compare(id, b.MinID) == -1 {
b.MinID = id
}

if len(b.MaxID) == 0 || bytes.Compare(id, b.MaxID) == 1 {
b.MaxID = id
}
Expand Down
Loading