diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cc59547148..d07950c40b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679) * [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688) * [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687) +* [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694) + This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans. * [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690) ## v0.7.0 diff --git a/go.mod b/go.mod index df708eeab8b..3841940c06f 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.20.0 github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0 + github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e github.com/sirupsen/logrus v1.7.0 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.7.0 diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index b3eddddc451..f0aa7b2239d 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -3,7 +3,6 @@ package distributor import ( "context" "fmt" - "strconv" "strings" "time" @@ -13,6 +12,7 @@ import ( cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/gogo/status" + "github.com/segmentio/fasthash/fnv1a" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -235,13 +235,13 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp req.Size()) } - keys, traces, err := requestsByTraceID(req, userID, spanCount) + keys, traces, ids, err := requestsByTraceID(req, userID, spanCount) if err != nil { metricDiscardedSpans.WithLabelValues(reasonInternalError, userID).Add(float64(spanCount)) return nil, err } - err = d.sendToIngestersViaBytes(ctx, userID, traces, keys) + err = d.sendToIngestersViaBytes(ctx, userID, traces, keys, ids) if err != nil { recordDiscaredSpans(err, userID, spanCount) } @@ -249,16 +249,15 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp return nil, err // PushRequest is ignored, so no reason to create one } -func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*tempopb.PushRequest, keys []uint32) error { - +func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*tempopb.Trace, keys []uint32, ids [][]byte) error { // Marshal to bytes once - rawRequests := make([][]byte, len(traces)) + marshalledTraces := make([][]byte, len(traces)) for i, t := range traces { b, err := t.Marshal() if err != nil { return errors.Wrap(err, "failed to marshal PushRequest") } - rawRequests[i] = b + marshalledTraces[i] = b } op := ring.WriteNoExtend @@ -267,17 +266,18 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string } err := ring.DoBatch(ctx, op, d.ingestersRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { - localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) defer cancel() localCtx = user.InjectOrgID(localCtx, userID) req := tempopb.PushBytesRequest{ - Requests: make([]tempopb.PreallocRequest, len(indexes)), + Traces: make([]tempopb.PreallocBytes, len(indexes)), + Ids: make([]tempopb.PreallocBytes, len(indexes)), } for i, j := range indexes { - req.Requests[i].Request = rawRequests[j][0:] + req.Traces[i].Slice = marshalledTraces[j][0:] + req.Ids[i].Slice = ids[j] } c, err := d.pool.GetClientFor(ingester.Addr) @@ -306,29 +306,37 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } -func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ([]uint32, []*tempopb.PushRequest, error) { - const expectedTracesPerBatch = 10 // roughly what we're seeing through metrics - expectedSpansPerTrace := spanCount / expectedTracesPerBatch +// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring +// and traces to pass onto the ingesters. +func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ([]uint32, []*tempopb.Trace, [][]byte, error) { + type traceAndID struct { + id []byte + trace *tempopb.Trace + } - requestsByTrace := make(map[uint32]*tempopb.PushRequest) - spansByILS := make(map[string]*v1.InstrumentationLibrarySpans) + const tracesPerBatch = 20 // p50 of internal env + tracesByID := make(map[uint32]*traceAndID, tracesPerBatch) + spansByILS := make(map[uint32]*v1.InstrumentationLibrarySpans) for _, ils := range req.Batch.InstrumentationLibrarySpans { for _, span := range ils.Spans { - if !validation.ValidTraceID(span.TraceId) { - return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit") + traceID := span.TraceId + if !validation.ValidTraceID(traceID) { + return nil, nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit") } - traceKey := util.TokenFor(userID, span.TraceId) - ilsKey := strconv.Itoa(int(traceKey)) + traceKey := util.TokenFor(userID, traceID) + ilsKey := traceKey if ils.InstrumentationLibrary != nil { - ilsKey = ilsKey + ils.InstrumentationLibrary.Name + ils.InstrumentationLibrary.Version + ilsKey = fnv1a.AddString32(ilsKey, ils.InstrumentationLibrary.Name) + ilsKey = fnv1a.AddString32(ilsKey, ils.InstrumentationLibrary.Version) } + existingILS, ok := spansByILS[ilsKey] if !ok { existingILS = &v1.InstrumentationLibrarySpans{ InstrumentationLibrary: ils.InstrumentationLibrary, - Spans: make([]*v1.Span, 0, expectedSpansPerTrace), + Spans: make([]*v1.Span, 0, spanCount/tracesPerBatch), } spansByILS[ilsKey] = existingILS } @@ -339,31 +347,38 @@ func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ( continue } - existingReq, ok := requestsByTrace[traceKey] + existingTrace, ok := tracesByID[traceKey] if !ok { - existingReq = &tempopb.PushRequest{ - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: make([]*v1.InstrumentationLibrarySpans, 0, len(req.Batch.InstrumentationLibrarySpans)), // assume most spans belong to the same trace - Resource: req.Batch.Resource, + existingTrace = &traceAndID{ + id: traceID, + trace: &tempopb.Trace{ + Batches: make([]*v1.ResourceSpans, 0, spanCount/tracesPerBatch), }, } - requestsByTrace[traceKey] = existingReq + + tracesByID[traceKey] = existingTrace } - existingReq.Batch.InstrumentationLibrarySpans = append(existingReq.Batch.InstrumentationLibrarySpans, existingILS) + + existingTrace.trace.Batches = append(existingTrace.trace.Batches, &v1.ResourceSpans{ + Resource: req.Batch.Resource, + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{existingILS}, + }) } } - metricTracesPerBatch.Observe(float64(len(requestsByTrace))) + metricTracesPerBatch.Observe(float64(len(tracesByID))) - keys := make([]uint32, 0, len(requestsByTrace)) - pushRequests := make([]*tempopb.PushRequest, 0, len(requestsByTrace)) + keys := make([]uint32, 0, len(tracesByID)) + traces := make([]*tempopb.Trace, 0, len(tracesByID)) + ids := make([][]byte, 0, len(tracesByID)) - for k, r := range requestsByTrace { + for k, r := range tracesByID { keys = append(keys, k) - pushRequests = append(pushRequests, r) + traces = append(traces, r.trace) + ids = append(ids, r.id) } - return keys, pushRequests, nil + return keys, traces, ids, nil } func recordDiscaredSpans(err error, userID string, spanCount int) { diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 04b7d65195d..afc43bbee5e 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -47,19 +47,21 @@ func TestRequestsByTraceID(t *testing.T) { traceIDB := []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} tests := []struct { - name string - request *tempopb.PushRequest - expectedKeys []uint32 - expectedReqs []*tempopb.PushRequest - expectedErr error + name string + request *tempopb.PushRequest + expectedKeys []uint32 + expectedTraces []*tempopb.Trace + expectedIDs [][]byte + expectedErr error }{ { name: "empty", request: &tempopb.PushRequest{ Batch: &v1.ResourceSpans{}, }, - expectedKeys: []uint32{}, - expectedReqs: []*tempopb.PushRequest{}, + expectedKeys: []uint32{}, + expectedTraces: []*tempopb.Trace{}, + expectedIDs: [][]byte{}, }, { name: "bad trace id", @@ -90,15 +92,20 @@ func TestRequestsByTraceID(t *testing.T) { }}}}}, }, expectedKeys: []uint32{util.TokenFor(util.FakeTenantID, traceIDA)}, - expectedReqs: []*tempopb.PushRequest{ + expectedTraces: []*tempopb.Trace{ { - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - TraceId: traceIDA, - }}}}}}, + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + TraceId: traceIDA, + }}}}}}, + }, + }, + expectedIDs: [][]byte{ + traceIDA, }, }, { @@ -116,24 +123,31 @@ func TestRequestsByTraceID(t *testing.T) { }}}}}, }, expectedKeys: []uint32{util.TokenFor(util.FakeTenantID, traceIDA), util.TokenFor(util.FakeTenantID, traceIDB)}, - expectedReqs: []*tempopb.PushRequest{ + expectedTraces: []*tempopb.Trace{ { - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - TraceId: traceIDA, - }}}}}, + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + TraceId: traceIDA, + }}}}}}, }, { - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - TraceId: traceIDB, - }}}}}}, + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + TraceId: traceIDB, + }}}}}}, + }, + }, + expectedIDs: [][]byte{ + traceIDA, + traceIDB, }, }, { @@ -154,30 +168,37 @@ func TestRequestsByTraceID(t *testing.T) { }}}}}, }, expectedKeys: []uint32{util.TokenFor(util.FakeTenantID, traceIDA), util.TokenFor(util.FakeTenantID, traceIDB)}, - expectedReqs: []*tempopb.PushRequest{ + expectedTraces: []*tempopb.Trace{ { - Batch: &v1.ResourceSpans{ - Resource: &v1_resource.Resource{ - DroppedAttributesCount: 1, - }, - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - TraceId: traceIDA, - }}}}}, + Batches: []*v1.ResourceSpans{ + { + Resource: &v1_resource.Resource{ + DroppedAttributesCount: 1, + }, + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + TraceId: traceIDA, + }}}}}}, }, { - Batch: &v1.ResourceSpans{ - Resource: &v1_resource.Resource{ - DroppedAttributesCount: 1, - }, - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - TraceId: traceIDB, - }}}}}}, + Batches: []*v1.ResourceSpans{ + { + Resource: &v1_resource.Resource{ + DroppedAttributesCount: 1, + }, + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + TraceId: traceIDB, + }}}}}}, + }, + }, + expectedIDs: [][]byte{ + traceIDA, + traceIDB, }, }, { @@ -198,30 +219,37 @@ func TestRequestsByTraceID(t *testing.T) { }}}}}, }, expectedKeys: []uint32{util.TokenFor(util.FakeTenantID, traceIDA), util.TokenFor(util.FakeTenantID, traceIDB)}, - expectedReqs: []*tempopb.PushRequest{ + expectedTraces: []*tempopb.Trace{ { - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - InstrumentationLibrary: &v1_common.InstrumentationLibrary{ - Name: "test", - }, - Spans: []*v1.Span{ - { - TraceId: traceIDA, - }}}}}, + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + InstrumentationLibrary: &v1_common.InstrumentationLibrary{ + Name: "test", + }, + Spans: []*v1.Span{ + { + TraceId: traceIDA, + }}}}}}, }, { - Batch: &v1.ResourceSpans{ - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - InstrumentationLibrary: &v1_common.InstrumentationLibrary{ - Name: "test", - }, - Spans: []*v1.Span{ - { - TraceId: traceIDB, - }}}}}}, + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + InstrumentationLibrary: &v1_common.InstrumentationLibrary{ + Name: "test", + }, + Spans: []*v1.Span{ + { + TraceId: traceIDB, + }}}}}}, + }, + }, + expectedIDs: [][]byte{ + traceIDA, + traceIDB, }, }, { @@ -247,34 +275,39 @@ func TestRequestsByTraceID(t *testing.T) { }}}}}, }, expectedKeys: []uint32{util.TokenFor(util.FakeTenantID, traceIDB)}, - expectedReqs: []*tempopb.PushRequest{ + expectedTraces: []*tempopb.Trace{ { - Batch: &v1.ResourceSpans{ - Resource: &v1_resource.Resource{ - DroppedAttributesCount: 3, - }, - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - InstrumentationLibrary: &v1_common.InstrumentationLibrary{ - Name: "test", - }, - Spans: []*v1.Span{ - { - TraceId: traceIDB, - Name: "spanA", + Batches: []*v1.ResourceSpans{ + { + Resource: &v1_resource.Resource{ + DroppedAttributesCount: 3, + }, + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + InstrumentationLibrary: &v1_common.InstrumentationLibrary{ + Name: "test", }, - { - TraceId: traceIDB, - Name: "spanB", - }}}}}, + Spans: []*v1.Span{ + { + TraceId: traceIDB, + Name: "spanA", + }, + { + TraceId: traceIDB, + Name: "spanB", + }}}}}}, }, }, + expectedIDs: [][]byte{ + traceIDB, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - keys, reqs, err := requestsByTraceID(tt.request, util.FakeTenantID, 1) + keys, reqs, ids, err := requestsByTraceID(tt.request, util.FakeTenantID, 1) + require.Equal(t, len(keys), len(reqs)) for i, expectedKey := range tt.expectedKeys { foundIndex := -1 @@ -287,9 +320,10 @@ func TestRequestsByTraceID(t *testing.T) { require.NotEqual(t, -1, foundIndex, "expected key %d not found", foundIndex) // now confirm that the request at this position is the expected one - expectedReq := tt.expectedReqs[i] + expectedReq := tt.expectedTraces[i] actualReq := reqs[foundIndex] assert.Equal(t, expectedReq, actualReq) + assert.Equal(t, tt.expectedIDs[i], ids[foundIndex]) } assert.Equal(t, tt.expectedErr, err) @@ -297,6 +331,37 @@ func TestRequestsByTraceID(t *testing.T) { } } +func BenchmarkTestsByRequestID(b *testing.B) { + spansPer := 100 + batches := 10 + traces := []*tempopb.Trace{ + test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), + test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), + test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0C, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), + test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0D, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), + } + ils := make([][]*v1.InstrumentationLibrarySpans, batches) + + for i := 0; i < batches; i++ { + for _, t := range traces { + ils[i] = append(ils[i], t.Batches[i].InstrumentationLibrarySpans...) + } + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for _, blerg := range ils { + _, _, _, err := requestsByTraceID(&tempopb.PushRequest{ + Batch: &v1.ResourceSpans{ + InstrumentationLibrarySpans: blerg, + }, + }, "test", spansPer*len(traces)) + require.NoError(b, err) + } + } +} + func TestDistributor(t *testing.T) { for i, tc := range []struct { lines int diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index ef99a218b2a..bdbd291eb36 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -144,6 +144,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } if blockID != uuid.Nil { + level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "userid", instance.instanceID, "block", blockID) // jitter to help when flushing many instances at the same time // no jitter if immediate (initiated via /flush handler for example) i.enqueue(&flushOp{ diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index b3cb43873d6..4fc7c5ca607 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -7,12 +7,14 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/gogo/status" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" + "google.golang.org/grpc/codes" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util/log" @@ -158,13 +160,15 @@ func (i *Ingester) markUnavailable() { i.stopIncomingRequests() } -// Push implements tempopb.Pusher.Push +// Push implements tempopb.Pusher.Push (super deprecated) func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) { + if i.readonly { + return nil, ErrReadOnly + } + instanceID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err - } else if i.readonly { - return nil, ErrReadOnly } instance, err := i.getOrCreateInstance(instanceID) @@ -178,23 +182,45 @@ func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb // PushBytes implements tempopb.Pusher.PushBytes func (i *Ingester) PushBytes(ctx context.Context, req *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { + if i.readonly { + return nil, ErrReadOnly + } + + if len(req.Traces) != len(req.Ids) { + return nil, status.Errorf(codes.InvalidArgument, "mismatched traces/ids length: %d, %d", len(req.Traces), len(req.Ids)) + } + + instanceID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } - // Unmarshal and push each request + instance, err := i.getOrCreateInstance(instanceID) + if err != nil { + return nil, err + } + + // Unmarshal and push each request (deprecated) for _, v := range req.Requests { r := tempopb.PushRequest{} - err := r.Unmarshal(v.Request) + err := r.Unmarshal(v.Slice) if err != nil { return nil, err } - _, err = i.Push(ctx, &r) + err = instance.Push(ctx, &r) if err != nil { return nil, err } } - // Reuse request instead of handing over to GC - tempopb.ReuseRequest(req) + // Unmarshal and push each trace + for i := range req.Traces { + err := instance.PushBytes(ctx, req.Ids[i].Slice, req.Traces[i].Slice) + if err != nil { + return nil, err + } + } return &tempopb.PushResponse{}, nil } @@ -218,7 +244,7 @@ func (i *Ingester) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequ return &tempopb.TraceByIDResponse{}, nil } - trace, err := inst.FindTraceByID(req.TraceID) + trace, err := inst.FindTraceByID(ctx, req.TraceID) if err != nil { return nil, err } diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 990a9319ca9..e9dd648bd46 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" @@ -77,11 +78,7 @@ func TestFullTraceReturned(t *testing.T) { model.SortTrace(trace) // push the first batch - _, err = ingester.Push(ctx, - &tempopb.PushRequest{ - Batch: trace.Batches[0], - }) - assert.NoError(t, err, "unexpected error pushing") + pushBatch(t, ingester, trace.Batches[0], traceID) // force cut all traces for _, instance := range ingester.instances { @@ -90,11 +87,14 @@ func TestFullTraceReturned(t *testing.T) { } // push the 2nd batch - _, err = ingester.Push(ctx, - &tempopb.PushRequest{ - Batch: trace.Batches[1], - }) - assert.NoError(t, err, "unexpected error pushing") + pushBatch(t, ingester, trace.Batches[1], traceID) + + // make sure the trace comes back whole + foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ + TraceID: traceID, + }) + assert.NoError(t, err, "unexpected error querying") + assert.True(t, proto.Equal(trace, foundTrace.Trace)) // force cut all traces for _, instance := range ingester.instances { @@ -102,13 +102,59 @@ func TestFullTraceReturned(t *testing.T) { assert.NoError(t, err, "unexpected error cutting traces") } + // make sure the trace comes back whole + foundTrace, err = ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ + TraceID: traceID, + }) + assert.NoError(t, err, "unexpected error querying") + assert.True(t, proto.Equal(trace, foundTrace.Trace)) +} + +func TestDeprecatedPush(t *testing.T) { + tmpDir, err := ioutil.TempDir("/tmp", "") + assert.NoError(t, err, "unexpected error getting tempdir") + defer os.RemoveAll(tmpDir) + + ctx := user.InjectOrgID(context.Background(), "test") + ingester, _, _ := defaultIngester(t, tmpDir) + + traceID := make([]byte, 16) + _, err = rand.Read(traceID) + assert.NoError(t, err) + trace := test.MakeTrace(2, traceID) // 2 batches + model.SortTrace(trace) + + // push the first batch using the deprecated method + pushDeprecatedBatch(t, ingester, trace.Batches[0]) + + // force cut all traces + for _, instance := range ingester.instances { + err = instance.CutCompleteTraces(0, true) + assert.NoError(t, err, "unexpected error cutting traces") + } + + // push the 2nd batch + pushBatch(t, ingester, trace.Batches[1], traceID) + // make sure the trace comes back whole foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ TraceID: traceID, }) assert.NoError(t, err, "unexpected error querying") - equal := proto.Equal(trace, foundTrace.Trace) - assert.True(t, equal) + assert.True(t, proto.Equal(trace, foundTrace.Trace)) + + // force cut all traces + for _, instance := range ingester.instances { + err = instance.CutCompleteTraces(0, true) + assert.NoError(t, err, "unexpected error cutting traces") + } + + // make sure the trace comes back whole + foundTrace, err = ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ + TraceID: traceID, + }) + assert.NoError(t, err, "unexpected error querying") + assert.True(t, proto.Equal(trace, foundTrace.Trace)) } func TestWal(t *testing.T) { @@ -247,14 +293,9 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, traceIDs = append(traceIDs, id) } - ctx := user.InjectOrgID(context.Background(), "test") - for _, trace := range traces { + for i, trace := range traces { for _, batch := range trace.Batches { - _, err := ingester.Push(ctx, - &tempopb.PushRequest{ - Batch: batch, - }) - require.NoError(t, err, "unexpected error pushing") + pushBatch(t, ingester, batch, traceIDs[i]) } } @@ -284,3 +325,48 @@ func defaultLimitsTestConfig() overrides.Limits { flagext.DefaultValues(&limits) return limits } + +func pushBatch(t *testing.T, i *Ingester, batch *v1.ResourceSpans, id []byte) { + ctx := user.InjectOrgID(context.Background(), "test") + + pbTrace := &tempopb.Trace{ + Batches: []*v1.ResourceSpans{batch}, + } + + bytesTrace, err := proto.Marshal(pbTrace) + require.NoError(t, err) + + _, err = i.PushBytes(ctx, &tempopb.PushBytesRequest{ + Traces: []tempopb.PreallocBytes{ + { + Slice: bytesTrace, + }, + }, + Ids: []tempopb.PreallocBytes{ + { + Slice: id, + }, + }, + }) + require.NoError(t, err) +} + +func pushDeprecatedBatch(t *testing.T, i *Ingester, batch *v1.ResourceSpans) { + ctx := user.InjectOrgID(context.Background(), "test") + + pbTrace := &tempopb.PushRequest{ + Batch: batch, + } + + bytesTrace, err := proto.Marshal(pbTrace) + require.NoError(t, err) + + _, err = i.PushBytes(ctx, &tempopb.PushBytesRequest{ + Requests: []tempopb.PreallocBytes{ + { + Slice: bytesTrace, + }, + }, + }) + require.NoError(t, err) +} diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index e2377583568..634e272a12f 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "encoding/hex" "fmt" "hash" "hash/fnv" @@ -22,6 +23,8 @@ import ( "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/grafana/tempo/pkg/validation" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" @@ -95,6 +98,8 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l * return i, nil } +// Push is used to push an entire tempopb.PushRequest. It is depecrecated and only required +// for older protocols. func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error { // check for max traces before grabbing the lock to better load shed err := i.limiter.AssertMaxTracesPerUser(i.instanceID, int(i.traceCount.Load())) @@ -105,16 +110,41 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error { i.tracesMtx.Lock() defer i.tracesMtx.Unlock() - trace, err := i.getOrCreateTrace(req) + id, err := pushRequestTraceID(req) if err != nil { return err } - if err := trace.Push(ctx, req); err != nil { + t := &tempopb.Trace{ + Batches: []*v1.ResourceSpans{req.Batch}, + } + + bytes, err := proto.Marshal(t) + if err != nil { return err } - return nil + trace := i.getOrCreateTrace(id) + return trace.Push(ctx, bytes) +} + +// PushBytes is used to push an unmarshalled tempopb.Trace to the instance +func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte) error { + if !validation.ValidTraceID(id) { + return status.Errorf(codes.InvalidArgument, "%s is not a valid traceid", hex.EncodeToString(id)) + } + + // check for max traces before grabbing the lock to better load shed + err := i.limiter.AssertMaxTracesPerUser(i.instanceID, int(i.traceCount.Load())) + if err != nil { + return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err) + } + + i.tracesMtx.Lock() + defer i.tracesMtx.Unlock() + + trace := i.getOrCreateTrace(id) + return trace.Push(ctx, traceBytes) } // Moves any complete traces out of the map to complete traces @@ -122,9 +152,9 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error tracesToCut := i.tracesToCut(cutoff, immediate) for _, t := range tracesToCut { - model.SortTrace(t.trace) + model.SortTraceBytes(t.traceBytes) - out, err := proto.Marshal(t.trace) + out, err := proto.Marshal(t.traceBytes) if err != nil { return err } @@ -134,6 +164,10 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error return err } i.bytesWrittenTotal.Add(float64(len(out))) + + // return trace byte slices to be reused by proto marshalling + // WARNING: can't reuse traceid's b/c the appender takes ownership of byte slices that are passed to it + tempopb.ReuseTraceBytes(t.traceBytes) } return nil @@ -261,14 +295,14 @@ func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error return err } -func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) { +func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace, error) { var err error var allBytes []byte // live traces i.tracesMtx.Lock() if liveTrace, ok := i.traces[i.tokenForTraceID(id)]; ok { - allBytes, err = proto.Marshal(liveTrace.trace) // todo(jpe) : handle this when marshalling the new format + allBytes, err = proto.Marshal(liveTrace.traceBytes) if err != nil { i.tracesMtx.Unlock() return nil, fmt.Errorf("unable to marshal liveTrace: %w", err) @@ -303,7 +337,7 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) { // completeBlock for _, c := range i.completeBlocks { - foundBytes, err = c.Find(context.TODO(), id) + foundBytes, err = c.Find(ctx, id) if err != nil { return nil, fmt.Errorf("completeBlock.Find failed: %w", err) } @@ -338,25 +372,20 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock) { // getOrCreateTrace will return a new trace object for the given request // It must be called under the i.tracesMtx lock -func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) { - traceID, err := pushRequestTraceID(req) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "unable to extract traceID: %v", err) - } - +func (i *instance) getOrCreateTrace(traceID []byte) *trace { fp := i.tokenForTraceID(traceID) trace, ok := i.traces[fp] if ok { - return trace, nil + return trace } maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID) - trace = newTrace(maxBytes, fp, traceID) + trace = newTrace(maxBytes, traceID) i.traces[fp] = trace i.tracesCreatedTotal.Inc() i.traceCount.Inc() - return trace, nil + return trace } // tokenForTraceID hash trace ID, should be called under lock diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 0f478b02cc0..2c5932010fb 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/storage" + "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb" @@ -85,37 +86,6 @@ func TestInstance(t *testing.T) { assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) } -func pushAndQuery(t *testing.T, i *instance, request *tempopb.PushRequest) uuid.UUID { - traceID := test.MustTraceID(request) - err := i.Push(context.Background(), request) - assert.NoError(t, err) - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - - trace, err := i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) - - err = i.CutCompleteTraces(0, true) - assert.NoError(t, err) - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - - trace, err = i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) - - blockID, err := i.CutBlockIfReady(0, 0, false) - assert.NoError(t, err, "unexpected error cutting block") - assert.NotEqual(t, blockID, uuid.Nil) - - trace, err = i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) - - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - - return blockID -} - func TestInstanceFind(t *testing.T) { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") @@ -129,23 +99,73 @@ func TestInstanceFind(t *testing.T) { i, err := newInstance("fake", limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") - request := test.MakeRequest(10, []byte{}) - blockID := pushAndQuery(t, i, request) + numTraces := 500 + ids := [][]byte{} + traces := []*tempopb.Trace{} + for j := 0; j < numTraces; j++ { + id := make([]byte, 16) + rand.Read(id) + + trace := test.MakeTrace(10, id) + model.SortTrace(trace) + traceBytes, err := trace.Marshal() + require.NoError(t, err) + + err = i.PushBytes(context.Background(), id, traceBytes) + require.NoError(t, err) + assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) + + ids = append(ids, id) + traces = append(traces, trace) + } + + queryAll(t, i, ids, traces) + + err = i.CutCompleteTraces(0, true) + require.NoError(t, err) + assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) + + for j := 0; j < numTraces; j++ { + traceBytes, err := traces[j].Marshal() + require.NoError(t, err) + + err = i.PushBytes(context.Background(), ids[j], traceBytes) + require.NoError(t, err) + } + + queryAll(t, i, ids, traces) + + blockID, err := i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + assert.NotEqual(t, blockID, uuid.Nil) - // make another completingBlock - request2 := test.MakeRequest(10, []byte{}) - pushAndQuery(t, i, request2) - assert.Len(t, i.completingBlocks, 2) + queryAll(t, i, ids, traces) err = i.CompleteBlock(blockID) - assert.NoError(t, err, "unexpected error completing block") + require.NoError(t, err) - assert.Len(t, i.completingBlocks, 2) + queryAll(t, i, ids, traces) - traceID := test.MustTraceID(request) - trace, err := i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) + err = i.ClearCompletingBlock(blockID) + require.NoError(t, err) + + queryAll(t, i, ids, traces) + + localBlock := i.GetBlockToBeFlushed(blockID) + require.NotNil(t, localBlock) + + err = ingester.store.WriteBlock(context.Background(), localBlock) + require.NoError(t, err) + + queryAll(t, i, ids, traces) +} + +func queryAll(t *testing.T, i *instance, ids [][]byte, traces []*tempopb.Trace) { + for j, id := range ids { + trace, err := i.FindTraceByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, traces[j], trace) + } } func TestInstanceDoesNotRace(t *testing.T) { @@ -203,7 +223,7 @@ func TestInstanceDoesNotRace(t *testing.T) { }) go concurrent(func() { - _, err := i.FindTraceByID([]byte{0x01}) + _, err := i.FindTraceByID(context.Background(), []byte{0x01}) assert.NoError(t, err, "error finding trace by id") }) @@ -321,10 +341,10 @@ func TestInstanceCutCompleteTraces(t *testing.T) { id := make([]byte, 16) rand.Read(id) - tracepb := test.MakeTrace(10, id) + tracepb := test.MakeTraceBytes(10, id) pastTrace := &trace{ traceID: id, - trace: tracepb, + traceBytes: tracepb, lastAppend: time.Now().Add(-time.Hour), } @@ -332,7 +352,7 @@ func TestInstanceCutCompleteTraces(t *testing.T) { rand.Read(id) nowTrace := &trace{ traceID: id, - trace: tracepb, + traceBytes: tracepb, lastAppend: time.Now().Add(time.Hour), } @@ -562,7 +582,7 @@ func BenchmarkInstanceFindTraceByID(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - trace, err := instance.FindTraceByID(traceID) + trace, err := instance.FindTraceByID(context.Background(), traceID) assert.NotNil(b, trace) assert.NoError(b, err) } diff --git a/modules/ingester/trace.go b/modules/ingester/trace.go index 0734d1bab39..aee4297b0a3 100644 --- a/modules/ingester/trace.go +++ b/modules/ingester/trace.go @@ -11,28 +11,28 @@ import ( ) type trace struct { - trace *tempopb.Trace - token uint32 + traceBytes *tempopb.TraceBytes lastAppend time.Time traceID []byte maxBytes int currentBytes int } -func newTrace(maxBytes int, token uint32, traceID []byte) *trace { +func newTrace(maxBytes int, traceID []byte) *trace { return &trace{ - token: token, - trace: &tempopb.Trace{}, + traceBytes: &tempopb.TraceBytes{ + Traces: make([][]byte, 0, 10), // 10 for luck + }, lastAppend: time.Now(), traceID: traceID, maxBytes: maxBytes, } } -func (t *trace) Push(_ context.Context, req *tempopb.PushRequest) error { +func (t *trace) Push(_ context.Context, trace []byte) error { t.lastAppend = time.Now() if t.maxBytes != 0 { - reqSize := req.Size() + reqSize := len(trace) if t.currentBytes+reqSize > t.maxBytes { return status.Errorf(codes.FailedPrecondition, "%s max size of trace (%d) exceeded while adding %d bytes", overrides.ErrorPrefixTraceTooLarge, t.maxBytes, reqSize) } @@ -40,7 +40,7 @@ func (t *trace) Push(_ context.Context, req *tempopb.PushRequest) error { t.currentBytes += reqSize } - t.trace.Batches = append(t.trace.Batches, req.Batch) + t.traceBytes.Traces = append(t.traceBytes.Traces, trace) return nil } diff --git a/pkg/model/trace.go b/pkg/model/combine.go similarity index 67% rename from pkg/model/trace.go rename to pkg/model/combine.go index 27d69a1c4c7..901124cad68 100644 --- a/pkg/model/trace.go +++ b/pkg/model/combine.go @@ -3,26 +3,39 @@ package model import ( "bytes" "encoding/binary" + "fmt" "hash" "hash/fnv" - "sort" + "github.com/go-kit/kit/log/level" + "github.com/grafana/tempo/pkg/tempopb" "github.com/pkg/errors" - "github.com/grafana/tempo/pkg/tempopb" - v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/cortexproject/cortex/pkg/util/log" ) -// todo(jpe): -// - add cross data encoding tests -// - extend benchmarks +type objectCombiner struct{} + +var ObjectCombiner = objectCombiner{} + +// Combine implements tempodb/encoding/common.ObjectCombiner +func (o objectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) { + combinedTrace, wasCombined, err := CombineTraceBytes(objA, objB, dataEncoding, dataEncoding) + if err != nil { + level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error()) + } + return combinedTrace, wasCombined +} -// CombineTraceBytes combines objA and objB encoded using dataEncodingA and dataEncodingB in dataEncodingA +// CombineTraceBytes combines objA and objB encoded using dataEncodingA and dataEncodingB and returns a trace encoded with dataEncodingA func CombineTraceBytes(objA []byte, objB []byte, dataEncodingA string, dataEncodingB string) (_ []byte, wasCombined bool, _ error) { // if the byte arrays are the same, we can return quickly if bytes.Equal(objA, objB) { return objA, false, nil } + if objB == nil { + return objA, false, nil + } // bytes differ. unmarshal and combine traces traceA, errA := Unmarshal(objA, dataEncodingA) @@ -30,18 +43,23 @@ func CombineTraceBytes(objA []byte, objB []byte, dataEncodingA string, dataEncod // if we had problems unmarshaling one or the other, return the one that marshalled successfully if errA != nil && errB == nil { - return objB, false, errors.Wrap(errA, "error unsmarshaling objA") + if dataEncodingA != dataEncodingB { + // have to convert objB to dataEncodingA + bytes, _ := marshal(traceB, dataEncodingA) + return bytes, false, fmt.Errorf("error unsmarshaling objA (%s): %w", dataEncodingA, errA) + } + return objB, false, fmt.Errorf("error unsmarshaling objA (%s): %w", dataEncodingA, errA) } else if errB != nil && errA == nil { - return objA, false, errors.Wrap(errB, "error unsmarshaling objB") + return objA, false, fmt.Errorf("error unsmarshaling objB (%s): %w", dataEncodingB, errB) } else if errA != nil && errB != nil { // if both failed let's send back an empty trace - bytes, _ := Marshal(&tempopb.Trace{}, dataEncodingA) - return bytes, false, errors.Wrap(errA, "both A and B failed to unmarshal. returning an empty trace") + bytes, _ := marshal(&tempopb.Trace{}, dataEncodingA) + return bytes, false, fmt.Errorf("both A (%s) and B (%s) failed to unmarshal. returning an empty trace", dataEncodingA, dataEncodingB) } traceComplete, _, _, _ := CombineTraceProtos(traceA, traceB) - bytes, err := Marshal(traceComplete, dataEncodingA) + bytes, err := marshal(traceComplete, dataEncodingA) if err != nil { return objA, true, errors.Wrap(err, "marshalling the combine trace threw an error") } @@ -114,47 +132,6 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int return traceA, spanCountA, spanCountB, spanCountTotal } -func SortTrace(t *tempopb.Trace) { - // Sort bottom up by span start times - for _, b := range t.Batches { - for _, ils := range b.InstrumentationLibrarySpans { - sort.Slice(ils.Spans, func(i, j int) bool { - return compareSpans(ils.Spans[i], ils.Spans[j]) - }) - } - sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool { - return compareIls(b.InstrumentationLibrarySpans[i], b.InstrumentationLibrarySpans[j]) - }) - } - sort.Slice(t.Batches, func(i, j int) bool { - return compareBatches(t.Batches[i], t.Batches[j]) - }) -} - -func compareBatches(a *v1.ResourceSpans, b *v1.ResourceSpans) bool { - if len(a.InstrumentationLibrarySpans) > 0 && len(b.InstrumentationLibrarySpans) > 0 { - return compareIls(a.InstrumentationLibrarySpans[0], b.InstrumentationLibrarySpans[0]) - } - return false -} - -func compareIls(a *v1.InstrumentationLibrarySpans, b *v1.InstrumentationLibrarySpans) bool { - if len(a.Spans) > 0 && len(b.Spans) > 0 { - return compareSpans(a.Spans[0], b.Spans[0]) - } - return false -} - -func compareSpans(a *v1.Span, b *v1.Span) bool { - // Sort by start time, then id - - if a.StartTimeUnixNano == b.StartTimeUnixNano { - return bytes.Compare(a.SpanId, b.SpanId) == -1 - } - - return a.StartTimeUnixNano < b.StartTimeUnixNano -} - // tokenForID returns a uint32 token for use in a hash map given a span id and span kind // buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function // kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique diff --git a/pkg/model/trace_test.go b/pkg/model/combine_test.go similarity index 51% rename from pkg/model/trace_test.go rename to pkg/model/combine_test.go index 1b086d6f6c8..b443673ba5d 100644 --- a/pkg/model/trace_test.go +++ b/pkg/model/combine_test.go @@ -1,15 +1,13 @@ package model import ( - "bytes" "fmt" "hash/fnv" "math/rand" "testing" - "github.com/golang/protobuf/proto" + "github.com/gogo/protobuf/proto" "github.com/grafana/tempo/pkg/tempopb" - v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,11 +20,6 @@ func TestCombine(t *testing.T) { SortTrace(t1) SortTrace(t2) - b1, err := proto.Marshal(t1) - assert.NoError(t, err) - b2, err := proto.Marshal(t2) - assert.NoError(t, err) - // split t2 into two traces t2a := &tempopb.Trace{} t2b := &tempopb.Trace{} @@ -38,70 +31,115 @@ func TestCombine(t *testing.T) { } } - b2a, err := proto.Marshal(t2a) - assert.NoError(t, err) - b2b, err := proto.Marshal(t2b) - assert.NoError(t, err) - tests := []struct { - trace1 []byte - trace2 []byte - expected []byte - errString string + name string + trace1 *tempopb.Trace + trace2 *tempopb.Trace + expected *tempopb.Trace + expectError bool }{ { - trace1: b1, - trace2: b1, - expected: b1, + name: "same trace", + trace1: t1, + trace2: t1, + expected: t1, }, { - trace1: b1, - trace2: []byte{0x01}, - expected: b1, - errString: "error unsmarshaling objB: proto: Trace: illegal tag 0 (wire type 1)", + name: "t2 is bad", + trace1: t1, + trace2: nil, + expected: t1, + expectError: true, }, { - trace1: []byte{0x01}, - trace2: b2, - expected: b2, - errString: "error unsmarshaling objA: proto: Trace: illegal tag 0 (wire type 1)", + name: "t1 is bad", + trace1: nil, + trace2: t2, + expected: t2, + expectError: true, }, { - trace1: []byte{0x01, 0x02, 0x03}, - trace2: []byte{0x01, 0x02, 0x03}, - expected: []byte{0x01, 0x02, 0x03}, + name: "combine trace", + trace1: t2a, + trace2: t2b, + expected: t2, }, { - trace1: b2a, - trace2: b2b, - expected: b2, - }, - { - trace1: []byte{0x01}, - trace2: []byte{0x02}, - expected: nil, - errString: "both A and B failed to unmarshal. returning an empty trace: proto: Trace: illegal tag 0 (wire type 1)", + name: "both bad", + trace1: nil, + trace2: nil, + expected: nil, + expectError: true, }, } for _, tt := range tests { - actual, _, err := CombineTraceBytes(tt.trace1, tt.trace2, "", "") - if len(tt.errString) > 0 { - assert.EqualError(t, err, tt.errString) - } else { - assert.NoError(t, err) + for _, enc1 := range allEncodings { + for _, enc2 := range allEncodings { + t.Run(fmt.Sprintf("%s:%s:%s", tt.name, enc1, enc2), func(t *testing.T) { + var b1 []byte + var b2 []byte + if tt.trace1 != nil { // nil means substitute garbage data + b1 = mustMarshal(tt.trace1, enc1) + } else { + b1 = []byte{0x01, 0x02} + } + if tt.trace2 != nil { // nil means substitute garbage data + b2 = mustMarshal(tt.trace2, enc2) + } else { + b2 = []byte{0x01, 0x02, 0x03} + } + + actual, _, err := CombineTraceBytes(b1, b2, enc1, enc2) + if tt.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if tt.expected != nil { + expected := mustMarshal(tt.expected, enc1) + assert.Equal(t, expected, actual) + } + }) + } } + } +} - if !bytes.Equal(tt.expected, actual) { - actualTrace := &tempopb.Trace{} - expectedTrace := &tempopb.Trace{} - - err = proto.Unmarshal(tt.expected, expectedTrace) - assert.NoError(t, err) - err = proto.Unmarshal(actual, actualTrace) - assert.NoError(t, err) - - assert.Equal(t, expectedTrace, actualTrace) +func TestCombineNils(t *testing.T) { + test := test.MakeTrace(1, nil) + SortTrace(test) + + for _, enc1 := range allEncodings { + for _, enc2 := range allEncodings { + t.Run(fmt.Sprintf("%s:%s", enc1, enc2), func(t *testing.T) { + // both nil + actualBytes, _, err := CombineTraceBytes(nil, nil, enc1, enc2) + require.NoError(t, err) + assert.Equal(t, []byte(nil), actualBytes) + + testBytes1, err := marshal(test, enc1) + require.NoError(t, err) + testBytes2, err := marshal(test, enc2) + require.NoError(t, err) + + // objB nil + actualBytes, _, err = CombineTraceBytes(testBytes1, nil, enc1, enc2) + require.NoError(t, err) + + actual, err := Unmarshal(actualBytes, enc1) + require.NoError(t, err) + assert.Equal(t, test, actual) + + // objA nil + actualBytes, _, err = CombineTraceBytes(nil, testBytes2, enc1, enc2) + require.NoError(t, err) + + actual, err = Unmarshal(actualBytes, enc1) + require.NoError(t, err) + assert.Equal(t, test, actual) + }) } } } @@ -204,90 +242,6 @@ func BenchmarkCombineTraceProtos(b *testing.B) { } } -func TestSortTrace(t *testing.T) { - tests := []struct { - input *tempopb.Trace - expected *tempopb.Trace - }{ - { - input: &tempopb.Trace{}, - expected: &tempopb.Trace{}, - }, - - { - input: &tempopb.Trace{ - Batches: []*v1.ResourceSpans{ - { - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - StartTimeUnixNano: 2, - }, - }, - }, - }, - }, - { - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - StartTimeUnixNano: 1, - }, - }, - }, - }, - }, - }, - }, - expected: &tempopb.Trace{ - Batches: []*v1.ResourceSpans{ - { - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - StartTimeUnixNano: 1, - }, - }, - }, - }, - }, - { - InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ - { - Spans: []*v1.Span{ - { - StartTimeUnixNano: 2, - }, - }, - }, - }, - }, - }, - }, - }, - } - - for _, tt := range tests { - SortTrace(tt.input) - - assert.Equal(t, tt.expected, tt.input) - } -} - -func TestUnmarshal(t *testing.T) { - trace := test.MakeTrace(100, nil) - bytes, err := proto.Marshal(trace) - require.NoError(t, err) - - actual, err := Unmarshal(bytes, CurrentEncoding) - require.NoError(t, err) - - assert.True(t, proto.Equal(trace, actual)) -} - func BenchmarkTokenForID(b *testing.B) { h := fnv.New32() id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} @@ -298,3 +252,12 @@ func BenchmarkTokenForID(b *testing.B) { _ = tokenForID(h, buffer, 0, id) } } + +func mustMarshal(trace *tempopb.Trace, encoding string) []byte { + b, err := marshal(trace, encoding) + if err != nil { + panic(err) + } + + return b +} diff --git a/pkg/model/combiner.go b/pkg/model/combiner.go deleted file mode 100644 index 45d1935d2cf..00000000000 --- a/pkg/model/combiner.go +++ /dev/null @@ -1,20 +0,0 @@ -package model - -import ( - "github.com/go-kit/kit/log/level" - - "github.com/cortexproject/cortex/pkg/util/log" -) - -type objectCombiner struct{} - -var ObjectCombiner = objectCombiner{} - -// Combine implements tempodb/encoding/common.ObjectCombiner -func (o objectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) { - combinedTrace, wasCombined, err := CombineTraceBytes(objA, objB, dataEncoding, dataEncoding) - if err != nil { - level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error()) - } - return combinedTrace, wasCombined -} diff --git a/pkg/model/encoding.go b/pkg/model/encoding.go index a4ecb88ee7e..5fde3f54cce 100644 --- a/pkg/model/encoding.go +++ b/pkg/model/encoding.go @@ -1,27 +1,78 @@ package model import ( + "fmt" + "github.com/grafana/tempo/pkg/tempopb" "github.com/gogo/protobuf/proto" ) // CurrentEncoding is a string representing the encoding that all new blocks should be created with -const CurrentEncoding = "" +// "" = tempopb.Trace +// "v1" = tempopb.TraceBytes +const CurrentEncoding = "v1" // TracePBEncoding is a string that represents the original TracePBEncoding. Pass this if you know that the // bytes are encoded *tracepb.Trace const TracePBEncoding = "" +// allEncodings is used for testing +var allEncodings = []string{ + CurrentEncoding, + TracePBEncoding, +} + // Unmarshal converts a byte slice of the passed encoding into a *tempopb.Trace func Unmarshal(obj []byte, dataEncoding string) (*tempopb.Trace, error) { trace := &tempopb.Trace{} - err := proto.Unmarshal(obj, trace) - return trace, err + + switch dataEncoding { + case "": + err := proto.Unmarshal(obj, trace) + if err != nil { + return nil, err + } + case "v1": + traceBytes := &tempopb.TraceBytes{} + err := proto.Unmarshal(obj, traceBytes) + if err != nil { + return nil, err + } + + for _, bytes := range traceBytes.Traces { + innerTrace := &tempopb.Trace{} + err = proto.Unmarshal(bytes, innerTrace) + if err != nil { + return nil, err + } + + trace.Batches = append(trace.Batches, innerTrace.Batches...) + } + default: + return nil, fmt.Errorf("unrecognized dataEncoding in Unmarshal %s", dataEncoding) + } + + return trace, nil } -// Marshal converts a tempopb.Trace into a byte slice encoded using dataEncoding -// nolint: interfacer -func Marshal(trace *tempopb.Trace, dataEncoding string) ([]byte, error) { - return proto.Marshal(trace) +// marshal converts a tempopb.Trace into a byte slice encoded using dataEncoding +// nolint:interfacer +func marshal(trace *tempopb.Trace, dataEncoding string) ([]byte, error) { + switch dataEncoding { + case "": + return proto.Marshal(trace) + case "v1": + traceBytes := &tempopb.TraceBytes{} + bytes, err := proto.Marshal(trace) + if err != nil { + return nil, err + } + + traceBytes.Traces = append(traceBytes.Traces, bytes) + + return proto.Marshal(traceBytes) + default: + return nil, fmt.Errorf("unrecognized dataEncoding in Unmarshal %s", dataEncoding) + } } diff --git a/pkg/model/encoding_test.go b/pkg/model/encoding_test.go new file mode 100644 index 00000000000..71def14f835 --- /dev/null +++ b/pkg/model/encoding_test.go @@ -0,0 +1,33 @@ +package model + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnmarshal(t *testing.T) { + empty := &tempopb.Trace{} + + for _, e := range allEncodings { + trace := test.MakeTrace(100, nil) + bytes, err := marshal(trace, e) + require.NoError(t, err) + + actual, err := Unmarshal(bytes, e) + require.NoError(t, err) + assert.True(t, proto.Equal(trace, actual)) + + actual, err = Unmarshal(nil, e) + assert.NoError(t, err) + assert.True(t, proto.Equal(empty, actual)) + + actual, err = Unmarshal([]byte{}, e) + assert.NoError(t, err) + assert.True(t, proto.Equal(empty, actual)) + } +} diff --git a/pkg/model/sort.go b/pkg/model/sort.go new file mode 100644 index 00000000000..38de0b50745 --- /dev/null +++ b/pkg/model/sort.go @@ -0,0 +1,60 @@ +package model + +import ( + "bytes" + "sort" + + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +// SortTrace sorts a *tempopb.Trace +func SortTrace(t *tempopb.Trace) { + // Sort bottom up by span start times + for _, b := range t.Batches { + for _, ils := range b.InstrumentationLibrarySpans { + sort.Slice(ils.Spans, func(i, j int) bool { + return compareSpans(ils.Spans[i], ils.Spans[j]) + }) + } + sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool { + return compareIls(b.InstrumentationLibrarySpans[i], b.InstrumentationLibrarySpans[j]) + }) + } + sort.Slice(t.Batches, func(i, j int) bool { + return compareBatches(t.Batches[i], t.Batches[j]) + }) +} + +func compareBatches(a *v1.ResourceSpans, b *v1.ResourceSpans) bool { + if len(a.InstrumentationLibrarySpans) > 0 && len(b.InstrumentationLibrarySpans) > 0 { + return compareIls(a.InstrumentationLibrarySpans[0], b.InstrumentationLibrarySpans[0]) + } + return false +} + +func compareIls(a *v1.InstrumentationLibrarySpans, b *v1.InstrumentationLibrarySpans) bool { + if len(a.Spans) > 0 && len(b.Spans) > 0 { + return compareSpans(a.Spans[0], b.Spans[0]) + } + return false +} + +func compareSpans(a *v1.Span, b *v1.Span) bool { + // Sort by start time, then id + if a.StartTimeUnixNano == b.StartTimeUnixNano { + return bytes.Compare(a.SpanId, b.SpanId) == -1 + } + + return a.StartTimeUnixNano < b.StartTimeUnixNano +} + +// SortTraceBytes sorts a *tempopb.TraceBytes +func SortTraceBytes(t *tempopb.TraceBytes) { + sort.Slice(t.Traces, func(i, j int) bool { + traceI := t.Traces[i] + traceJ := t.Traces[j] + + return bytes.Compare(traceI, traceJ) == -1 + }) +} diff --git a/pkg/model/sort_test.go b/pkg/model/sort_test.go new file mode 100644 index 00000000000..4817149e10b --- /dev/null +++ b/pkg/model/sort_test.go @@ -0,0 +1,140 @@ +package model + +import ( + "math/rand" + "testing" + + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSortTrace(t *testing.T) { + tests := []struct { + input *tempopb.Trace + expected *tempopb.Trace + }{ + { + input: &tempopb.Trace{}, + expected: &tempopb.Trace{}, + }, + + { + input: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + StartTimeUnixNano: 2, + }, + }, + }, + }, + }, + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + StartTimeUnixNano: 1, + }, + }, + }, + }, + }, + }, + }, + expected: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + StartTimeUnixNano: 1, + }, + }, + }, + }, + }, + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + StartTimeUnixNano: 2, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + SortTrace(tt.input) + + assert.Equal(t, tt.expected, tt.input) + } +} + +func TestSortTraceBytes(t *testing.T) { + numTraces := 100 + + // create first trace + traceBytes := &tempopb.TraceBytes{ + Traces: make([][]byte, numTraces), + } + for i := range traceBytes.Traces { + traceBytes.Traces[i] = make([]byte, rand.Intn(10)) + _, err := rand.Read(traceBytes.Traces[i]) + require.NoError(t, err) + } + + // dupe + traceBytes2 := &tempopb.TraceBytes{ + Traces: make([][]byte, numTraces), + } + for i := range traceBytes.Traces { + traceBytes2.Traces[i] = make([]byte, len(traceBytes.Traces[i])) + copy(traceBytes2.Traces[i], traceBytes.Traces[i]) + } + + // randomize dupe + rand.Shuffle(len(traceBytes2.Traces), func(i, j int) { + traceBytes2.Traces[i], traceBytes2.Traces[j] = traceBytes2.Traces[j], traceBytes2.Traces[i] + }) + + assert.NotEqual(t, traceBytes, traceBytes2) + + // sort and compare + SortTraceBytes(traceBytes) + SortTraceBytes(traceBytes2) + + assert.Equal(t, traceBytes, traceBytes2) +} + +func BenchmarkSortTraceBytes(b *testing.B) { + numTraces := 100 + + traceBytes := &tempopb.TraceBytes{ + Traces: make([][]byte, numTraces), + } + for i := range traceBytes.Traces { + traceBytes.Traces[i] = make([]byte, rand.Intn(10)) + _, err := rand.Read(traceBytes.Traces[i]) + require.NoError(b, err) + } + + for i := 0; i < b.N; i++ { + rand.Shuffle(len(traceBytes.Traces), func(i, j int) { + traceBytes.Traces[i], traceBytes.Traces[j] = traceBytes.Traces[j], traceBytes.Traces[i] + }) + SortTraceBytes(traceBytes) + } +} diff --git a/pkg/tempopb/prealloc.go b/pkg/tempopb/prealloc.go index ef2b09460cd..6bdd07d4486 100644 --- a/pkg/tempopb/prealloc.go +++ b/pkg/tempopb/prealloc.go @@ -9,38 +9,37 @@ var ( bytePool = pool.New(500, 16_000, 2, func(size int) interface{} { return make([]byte, 0, size) }) ) -// PreallocRequest is a (repeated bytes requests) which preallocs slices on Unmarshal. -type PreallocRequest struct { - Request []byte +// PreallocBytes is a (repeated bytes slices) which preallocs slices on Unmarshal. +type PreallocBytes struct { + Slice []byte } // Unmarshal implements proto.Message. -func (r *PreallocRequest) Unmarshal(dAtA []byte) error { - r.Request = bytePool.Get(len(dAtA)).([]byte) - r.Request = r.Request[:len(dAtA)] - copy(r.Request, dAtA) +func (r *PreallocBytes) Unmarshal(dAtA []byte) error { + r.Slice = bytePool.Get(len(dAtA)).([]byte) + r.Slice = r.Slice[:len(dAtA)] + copy(r.Slice, dAtA) return nil } // MarshalTo implements proto.Marshaller. // returned int is not used -func (r *PreallocRequest) MarshalTo(dAtA []byte) (int, error) { - copy(dAtA[:], r.Request[:]) - return len(r.Request), nil +func (r *PreallocBytes) MarshalTo(dAtA []byte) (int, error) { + copy(dAtA[:], r.Slice[:]) + return len(r.Slice), nil } // Size implements proto.Sizer. -func (r *PreallocRequest) Size() (n int) { +func (r *PreallocBytes) Size() (n int) { if r == nil { return 0 } - return len(r.Request) + return len(r.Slice) } -// ReuseRequest puts the byte slice back into bytePool for reuse. -func ReuseRequest(req *PushBytesRequest) { - for i := range req.Requests { - // We want to preserve the underlying allocated memory, [:0] helps us retains the cap() of the slice - bytePool.Put(req.Requests[i].Request[:0]) +// ReuseTraceBytes puts the byte slice back into bytePool for reuse. +func ReuseTraceBytes(trace *TraceBytes) { + for _, t := range trace.Traces { + bytePool.Put(t[:0]) } } diff --git a/pkg/tempopb/prealloc_test.go b/pkg/tempopb/prealloc_test.go index 9daaeef8106..10eac33dd4a 100644 --- a/pkg/tempopb/prealloc_test.go +++ b/pkg/tempopb/prealloc_test.go @@ -11,29 +11,29 @@ func TestUnmarshal(t *testing.T) { var dummyData = make([]byte, 10) rand.Read(dummyData) - preallocReq := &PreallocRequest{} + preallocReq := &PreallocBytes{} err := preallocReq.Unmarshal(dummyData) assert.NoError(t, err) - assert.Equal(t, dummyData, preallocReq.Request) + assert.Equal(t, dummyData, preallocReq.Slice) } func TestMarshal(t *testing.T) { - preallocReq := &PreallocRequest{ - Request: make([]byte, 10), + preallocReq := &PreallocBytes{ + Slice: make([]byte, 10), } - rand.Read(preallocReq.Request) + rand.Read(preallocReq.Slice) var dummyData = make([]byte, 10) _, err := preallocReq.MarshalTo(dummyData) assert.NoError(t, err) - assert.Equal(t, preallocReq.Request, dummyData) + assert.Equal(t, preallocReq.Slice, dummyData) } func TestSize(t *testing.T) { - preallocReq := &PreallocRequest{ - Request: make([]byte, 10), + preallocReq := &PreallocBytes{ + Slice: make([]byte, 10), } assert.Equal(t, 10, preallocReq.Size()) } diff --git a/pkg/tempopb/tempo.pb.go b/pkg/tempopb/tempo.pb.go index 47e261b93d5..f6d9beb0595 100644 --- a/pkg/tempopb/tempo.pb.go +++ b/pkg/tempopb/tempo.pb.go @@ -28,6 +28,7 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +// Read type TraceByIDRequest struct { TraceID []byte `protobuf:"bytes,1,opt,name=traceID,proto3" json:"traceID,omitempty"` BlockStart string `protobuf:"bytes,2,opt,name=blockStart,proto3" json:"blockStart,omitempty"` @@ -184,6 +185,7 @@ func (m *Trace) GetBatches() []*v1.ResourceSpans { return nil } +// Write type PushRequest struct { Batch *v1.ResourceSpans `protobuf:"bytes,1,opt,name=batch,proto3" json:"batch,omitempty"` } @@ -265,8 +267,12 @@ func (m *PushResponse) XXX_DiscardUnknown() { var xxx_messageInfo_PushResponse proto.InternalMessageInfo type PushBytesRequest struct { - // pre-serialized PushRequests - Requests []PreallocRequest `protobuf:"bytes,1,rep,name=requests,proto3,customtype=PreallocRequest" json:"requests"` + // pre-marshalled PushRequests + Requests []PreallocBytes `protobuf:"bytes,1,rep,name=requests,proto3,customtype=PreallocBytes" json:"requests"` // Deprecated: Do not use. + // pre-marshalled Traces. length must match ids + Traces []PreallocBytes `protobuf:"bytes,2,rep,name=traces,proto3,customtype=PreallocBytes" json:"traces"` + // trace ids. length must match traces + Ids []PreallocBytes `protobuf:"bytes,3,rep,name=ids,proto3,customtype=PreallocBytes" json:"ids"` } func (m *PushBytesRequest) Reset() { *m = PushBytesRequest{} } @@ -302,6 +308,51 @@ func (m *PushBytesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_PushBytesRequest proto.InternalMessageInfo +type TraceBytes struct { + // pre-marshalled Traces + Traces [][]byte `protobuf:"bytes,1,rep,name=traces,proto3" json:"traces,omitempty"` +} + +func (m *TraceBytes) Reset() { *m = TraceBytes{} } +func (m *TraceBytes) String() string { return proto.CompactTextString(m) } +func (*TraceBytes) ProtoMessage() {} +func (*TraceBytes) Descriptor() ([]byte, []int) { + return fileDescriptor_f22805646f4f62b6, []int{6} +} +func (m *TraceBytes) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TraceBytes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TraceBytes.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TraceBytes) XXX_Merge(src proto.Message) { + xxx_messageInfo_TraceBytes.Merge(m, src) +} +func (m *TraceBytes) XXX_Size() int { + return m.Size() +} +func (m *TraceBytes) XXX_DiscardUnknown() { + xxx_messageInfo_TraceBytes.DiscardUnknown(m) +} + +var xxx_messageInfo_TraceBytes proto.InternalMessageInfo + +func (m *TraceBytes) GetTraces() [][]byte { + if m != nil { + return m.Traces + } + return nil +} + func init() { proto.RegisterType((*TraceByIDRequest)(nil), "tempopb.TraceByIDRequest") proto.RegisterType((*TraceByIDResponse)(nil), "tempopb.TraceByIDResponse") @@ -309,39 +360,43 @@ func init() { proto.RegisterType((*PushRequest)(nil), "tempopb.PushRequest") proto.RegisterType((*PushResponse)(nil), "tempopb.PushResponse") proto.RegisterType((*PushBytesRequest)(nil), "tempopb.PushBytesRequest") + proto.RegisterType((*TraceBytes)(nil), "tempopb.TraceBytes") } func init() { proto.RegisterFile("pkg/tempopb/tempo.proto", fileDescriptor_f22805646f4f62b6) } var fileDescriptor_f22805646f4f62b6 = []byte{ - // 435 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0x4d, 0x6f, 0xd3, 0x30, - 0x18, 0x8e, 0x19, 0x59, 0xd6, 0xb7, 0x65, 0x0c, 0x6b, 0x68, 0x21, 0x42, 0x69, 0x15, 0x71, 0xe8, - 0x85, 0x44, 0xcb, 0xb4, 0xc3, 0x4e, 0x48, 0x51, 0xf9, 0xd8, 0x01, 0x69, 0xa4, 0xfc, 0x81, 0x24, - 0x35, 0x69, 0xb5, 0x2e, 0xce, 0x6c, 0xa7, 0x52, 0x6f, 0x9c, 0x38, 0xf3, 0xb3, 0x7a, 0xec, 0x11, - 0x71, 0xa8, 0x50, 0xfb, 0x47, 0x50, 0xec, 0x24, 0x84, 0x0a, 0xb4, 0x53, 0xde, 0xe7, 0xc3, 0x8f, - 0x9f, 0xd8, 0x86, 0xb3, 0xfc, 0x36, 0xf5, 0x04, 0xb9, 0xcb, 0x69, 0x1e, 0xab, 0xaf, 0x9b, 0x33, - 0x2a, 0x28, 0x36, 0x2a, 0xd2, 0x3a, 0x15, 0x2c, 0x4a, 0x88, 0xb7, 0x38, 0xf7, 0xe4, 0xa0, 0x64, - 0xeb, 0x75, 0x3a, 0x13, 0xd3, 0x22, 0x76, 0x13, 0x7a, 0xe7, 0xa5, 0x34, 0xa5, 0x9e, 0xa4, 0xe3, - 0xe2, 0x8b, 0x44, 0x12, 0xc8, 0x49, 0xd9, 0x9d, 0x6f, 0x08, 0x4e, 0x3e, 0x97, 0xcb, 0x83, 0xe5, - 0xf5, 0x28, 0x24, 0xf7, 0x05, 0xe1, 0x02, 0x9b, 0x60, 0xc8, 0xc8, 0xeb, 0x91, 0x89, 0x06, 0x68, - 0xd8, 0x0b, 0x6b, 0x88, 0x6d, 0x80, 0x78, 0x4e, 0x93, 0xdb, 0xb1, 0x88, 0x98, 0x30, 0x1f, 0x0d, - 0xd0, 0xb0, 0x13, 0xb6, 0x18, 0x6c, 0xc1, 0x91, 0x44, 0x6f, 0xb3, 0x89, 0x79, 0x20, 0xd5, 0x06, - 0xe3, 0x97, 0xd0, 0xb9, 0x2f, 0x08, 0x5b, 0x7e, 0xa4, 0x13, 0x62, 0xea, 0x52, 0xfc, 0x43, 0x38, - 0x57, 0xf0, 0xac, 0xd5, 0x83, 0xe7, 0x34, 0xe3, 0x04, 0xbf, 0x02, 0x5d, 0xee, 0x2c, 0x6b, 0x74, - 0xfd, 0x63, 0xb7, 0xfa, 0x77, 0x57, 0x5a, 0x43, 0x25, 0x3a, 0x01, 0xe8, 0x12, 0xe3, 0x2b, 0x30, - 0xe2, 0x48, 0x24, 0x53, 0xc2, 0x4d, 0x34, 0x38, 0x18, 0x76, 0xfd, 0x7e, 0xb3, 0x40, 0x1d, 0xd1, - 0xe2, 0xdc, 0x0d, 0x09, 0xa7, 0x05, 0x4b, 0xc8, 0x38, 0x8f, 0x32, 0x1e, 0xd6, 0x7e, 0x67, 0x04, - 0xdd, 0x9b, 0x82, 0x4f, 0xeb, 0x13, 0xb8, 0x04, 0x5d, 0x2a, 0xd5, 0xc6, 0x0f, 0xe6, 0x28, 0xb7, - 0x73, 0x0c, 0x3d, 0x95, 0xa2, 0xfa, 0x3b, 0xef, 0xe1, 0xa4, 0xc4, 0xc1, 0x52, 0x10, 0x5e, 0x47, - 0x5f, 0xc0, 0x11, 0x53, 0xa3, 0x6a, 0xd9, 0x0b, 0xce, 0x56, 0x9b, 0xbe, 0xf6, 0x73, 0xd3, 0x7f, - 0x7a, 0xc3, 0x48, 0x34, 0x9f, 0xd3, 0xa4, 0xb2, 0x86, 0x8d, 0xd1, 0xff, 0x8a, 0xe0, 0xb0, 0x4c, - 0x22, 0x0c, 0x5f, 0xc2, 0xe3, 0x72, 0xc2, 0xa7, 0x4d, 0xa7, 0x56, 0x71, 0xeb, 0xf9, 0x1e, 0x5b, - 0x15, 0xd1, 0xf0, 0x1b, 0xe8, 0x34, 0x55, 0xf0, 0x8b, 0xbf, 0x5c, 0xed, 0x7a, 0xff, 0x0d, 0xf0, - 0xc7, 0x60, 0x7c, 0x2a, 0x08, 0x9b, 0x11, 0x86, 0x3f, 0xc0, 0x93, 0x77, 0xb3, 0x6c, 0xd2, 0xdc, - 0x57, 0x2b, 0x6f, 0xff, 0x2d, 0x59, 0xd6, 0xbf, 0xa4, 0x3a, 0x34, 0x30, 0x57, 0x5b, 0x1b, 0xad, - 0xb7, 0x36, 0xfa, 0xb5, 0xb5, 0xd1, 0xf7, 0x9d, 0xad, 0xad, 0x77, 0xb6, 0xf6, 0x63, 0x67, 0x6b, - 0xf1, 0xa1, 0x7c, 0x9f, 0x17, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x97, 0x5f, 0xaf, 0xe4, 0x08, + // 483 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x41, 0x6f, 0xd3, 0x4c, + 0x10, 0xcd, 0x36, 0x5f, 0x92, 0x66, 0x92, 0x56, 0xfd, 0x56, 0x2d, 0x5d, 0x2c, 0xe4, 0x44, 0x56, + 0x25, 0x72, 0xa9, 0xad, 0xa6, 0xea, 0xa1, 0x27, 0x24, 0x2b, 0x20, 0x7a, 0x40, 0x2a, 0x0e, 0x7f, + 0xc0, 0x76, 0x96, 0x24, 0x6a, 0x9a, 0x75, 0x77, 0xd7, 0x95, 0x72, 0xe3, 0xc4, 0x99, 0x7f, 0xc0, + 0xdf, 0xe9, 0xb1, 0x47, 0xc4, 0xa1, 0x42, 0xc9, 0x1f, 0x41, 0x9e, 0xb5, 0x8d, 0xa9, 0x0a, 0x9c, + 0x32, 0xf3, 0xde, 0x9b, 0x37, 0xe3, 0xa7, 0x0d, 0x1c, 0x26, 0x57, 0x53, 0x4f, 0xf3, 0xeb, 0x44, + 0x24, 0x91, 0xf9, 0x75, 0x13, 0x29, 0xb4, 0xa0, 0xad, 0x1c, 0xb4, 0xf6, 0xb5, 0x0c, 0x63, 0xee, + 0xdd, 0x9e, 0x78, 0x58, 0x18, 0xda, 0x3a, 0x9e, 0xce, 0xf5, 0x2c, 0x8d, 0xdc, 0x58, 0x5c, 0x7b, + 0x53, 0x31, 0x15, 0x1e, 0xc2, 0x51, 0xfa, 0x11, 0x3b, 0x6c, 0xb0, 0x32, 0x72, 0xe7, 0x33, 0x81, + 0xbd, 0x0f, 0xd9, 0xb8, 0xbf, 0xba, 0x18, 0x05, 0xfc, 0x26, 0xe5, 0x4a, 0x53, 0x06, 0x2d, 0xb4, + 0xbc, 0x18, 0x31, 0xd2, 0x27, 0x83, 0x6e, 0x50, 0xb4, 0xd4, 0x06, 0x88, 0x16, 0x22, 0xbe, 0x1a, + 0xeb, 0x50, 0x6a, 0xb6, 0xd5, 0x27, 0x83, 0x76, 0x50, 0x41, 0xa8, 0x05, 0xdb, 0xd8, 0xbd, 0x5e, + 0x4e, 0x58, 0x1d, 0xd9, 0xb2, 0xa7, 0x2f, 0xa0, 0x7d, 0x93, 0x72, 0xb9, 0x7a, 0x27, 0x26, 0x9c, + 0x35, 0x90, 0xfc, 0x05, 0x38, 0xe7, 0xf0, 0x7f, 0xe5, 0x0e, 0x95, 0x88, 0xa5, 0xe2, 0xf4, 0x08, + 0x1a, 0xb8, 0x19, 0xcf, 0xe8, 0x0c, 0x77, 0xdd, 0xfc, 0xdb, 0x5d, 0x94, 0x06, 0x86, 0x74, 0x7c, + 0x68, 0x60, 0x4f, 0xcf, 0xa1, 0x15, 0x85, 0x3a, 0x9e, 0x71, 0xc5, 0x48, 0xbf, 0x3e, 0xe8, 0x0c, + 0x7b, 0xe5, 0x80, 0x89, 0xe8, 0xf6, 0xc4, 0x0d, 0xb8, 0x12, 0xa9, 0x8c, 0xf9, 0x38, 0x09, 0x97, + 0x2a, 0x28, 0xf4, 0xce, 0x08, 0x3a, 0x97, 0xa9, 0x9a, 0x15, 0x09, 0x9c, 0x41, 0x03, 0x99, 0x7c, + 0xf1, 0x3f, 0x7d, 0x8c, 0xda, 0xd9, 0x85, 0xae, 0x71, 0x31, 0xf7, 0x3b, 0x5f, 0x09, 0xec, 0x65, + 0x80, 0xbf, 0xd2, 0x5c, 0x15, 0xde, 0xa7, 0xb0, 0x2d, 0x4d, 0x69, 0xce, 0xec, 0xfa, 0x87, 0x77, + 0x0f, 0xbd, 0xda, 0xf7, 0x87, 0xde, 0xce, 0xa5, 0xe4, 0xe1, 0x62, 0x21, 0x62, 0xd4, 0x33, 0x12, + 0x94, 0x42, 0x7a, 0x0c, 0x4d, 0x5c, 0xad, 0xd8, 0x16, 0x8e, 0x1c, 0x3c, 0x39, 0x12, 0xe4, 0x22, + 0xfa, 0x12, 0xea, 0xf3, 0x89, 0x62, 0xf5, 0xbf, 0x69, 0x33, 0x85, 0x73, 0x04, 0x90, 0xc7, 0xae, + 0xb9, 0xa2, 0xcf, 0xca, 0x2d, 0x78, 0x58, 0x61, 0x37, 0xfc, 0x44, 0xa0, 0x99, 0x7d, 0x07, 0x97, + 0xf4, 0x0c, 0xfe, 0xcb, 0x2a, 0xba, 0x5f, 0x46, 0x52, 0xc9, 0xcd, 0x3a, 0x78, 0x84, 0xe6, 0x39, + 0xd4, 0xe8, 0x2b, 0x68, 0x97, 0x41, 0xd0, 0xe7, 0xbf, 0xa9, 0xaa, 0xe1, 0xfc, 0xd1, 0x60, 0x38, + 0x86, 0xd6, 0xfb, 0x94, 0xcb, 0x39, 0x97, 0xf4, 0x2d, 0xec, 0xbc, 0x99, 0x2f, 0x27, 0xe5, 0x73, + 0xa9, 0xf8, 0x3d, 0x7e, 0xca, 0x96, 0xf5, 0x14, 0x55, 0x98, 0xfa, 0xec, 0x6e, 0x6d, 0x93, 0xfb, + 0xb5, 0x4d, 0x7e, 0xac, 0x6d, 0xf2, 0x65, 0x63, 0xd7, 0xee, 0x37, 0x76, 0xed, 0xdb, 0xc6, 0xae, + 0x45, 0x4d, 0xfc, 0x7b, 0x9c, 0xfe, 0x0c, 0x00, 0x00, 0xff, 0xff, 0x16, 0x85, 0xb9, 0x7e, 0x87, 0x03, 0x00, 0x00, } @@ -734,6 +789,34 @@ func (m *PushBytesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Ids) > 0 { + for iNdEx := len(m.Ids) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Ids[iNdEx].Size() + i -= size + if _, err := m.Ids[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintTempo(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if len(m.Traces) > 0 { + for iNdEx := len(m.Traces) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Traces[iNdEx].Size() + i -= size + if _, err := m.Traces[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintTempo(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } if len(m.Requests) > 0 { for iNdEx := len(m.Requests) - 1; iNdEx >= 0; iNdEx-- { { @@ -751,6 +834,38 @@ func (m *PushBytesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *TraceBytes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TraceBytes) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TraceBytes) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Traces) > 0 { + for iNdEx := len(m.Traces) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Traces[iNdEx]) + copy(dAtA[i:], m.Traces[iNdEx]) + i = encodeVarintTempo(dAtA, i, uint64(len(m.Traces[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func encodeVarintTempo(dAtA []byte, offset int, v uint64) int { offset -= sovTempo(v) base := offset @@ -849,6 +964,33 @@ func (m *PushBytesRequest) Size() (n int) { n += 1 + l + sovTempo(uint64(l)) } } + if len(m.Traces) > 0 { + for _, e := range m.Traces { + l = e.Size() + n += 1 + l + sovTempo(uint64(l)) + } + } + if len(m.Ids) > 0 { + for _, e := range m.Ids { + l = e.Size() + n += 1 + l + sovTempo(uint64(l)) + } + } + return n +} + +func (m *TraceBytes) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Traces) > 0 { + for _, b := range m.Traces { + l = len(b) + n += 1 + l + sovTempo(uint64(l)) + } + } return n } @@ -1402,12 +1544,164 @@ func (m *PushBytesRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - var v PreallocRequest + var v PreallocBytes m.Requests = append(m.Requests, v) if err := m.Requests[len(m.Requests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Traces", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTempo + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthTempo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v PreallocBytes + m.Traces = append(m.Traces, v) + if err := m.Traces[len(m.Traces)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ids", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTempo + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthTempo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v PreallocBytes + m.Ids = append(m.Ids, v) + if err := m.Ids[len(m.Ids)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTempo(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTempo + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TraceBytes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TraceBytes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TraceBytes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Traces", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTempo + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthTempo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Traces = append(m.Traces, make([]byte, postIndex-iNdEx)) + copy(m.Traces[len(m.Traces)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTempo(dAtA[iNdEx:]) diff --git a/pkg/tempopb/tempo.proto b/pkg/tempopb/tempo.proto index 3a9ffdb938e..2668018845e 100644 --- a/pkg/tempopb/tempo.proto +++ b/pkg/tempopb/tempo.proto @@ -14,6 +14,7 @@ service Querier { rpc FindTraceByID(TraceByIDRequest) returns (TraceByIDResponse) {}; } +// Read message TraceByIDRequest { bytes traceID = 1; string blockStart = 2; @@ -29,6 +30,7 @@ message Trace { repeated tempopb.trace.v1.ResourceSpans batches = 1; } +// Write message PushRequest { tempopb.trace.v1.ResourceSpans batch = 1; } @@ -37,6 +39,16 @@ message PushResponse { } message PushBytesRequest { - // pre-serialized PushRequests - repeated bytes requests = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocRequest"]; + // pre-marshalled PushRequests + repeated bytes requests = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes", deprecated=true]; + + // pre-marshalled Traces. length must match ids + repeated bytes traces = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"]; + // trace ids. length must match traces + repeated bytes ids = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"]; } + +message TraceBytes { + // pre-marshalled Traces + repeated bytes traces = 1; +} \ No newline at end of file diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 093dbef2720..7f6d1cc2aa6 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -3,6 +3,7 @@ package test import ( "math/rand" + "github.com/gogo/protobuf/proto" "github.com/grafana/tempo/pkg/tempopb" v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1" @@ -46,6 +47,27 @@ func MakeRequest(spans int, traceID []byte) *tempopb.PushRequest { return req } +func MakeTraceBytes(requests int, traceID []byte) *tempopb.TraceBytes { + trace := &tempopb.Trace{ + Batches: make([]*v1_trace.ResourceSpans, 0), + } + + for i := 0; i < requests; i++ { + trace.Batches = append(trace.Batches, MakeRequest(rand.Int()%20+1, traceID).Batch) + } + + bytes, err := proto.Marshal(trace) + if err != nil { + panic(err) + } + + traceBytes := &tempopb.TraceBytes{ + Traces: [][]byte{bytes}, + } + + return traceBytes +} + func MakeTrace(requests int, traceID []byte) *tempopb.Trace { trace := &tempopb.Trace{ Batches: make([]*v1_trace.ResourceSpans, 0), diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index c36ad615eaa..6b5bb3eae06 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -199,7 +199,7 @@ func (a *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte if err != nil { return nil, err } - if records == nil { + if len(records) == 0 { return nil, nil } diff --git a/tempodb/wal/append_block_test.go b/tempodb/wal/append_block_test.go index 612c7f145cd..257d26f9b21 100644 --- a/tempodb/wal/append_block_test.go +++ b/tempodb/wal/append_block_test.go @@ -8,10 +8,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestAppendBlockBadFile(t *testing.T) { - -} - func TestFullFilename(t *testing.T) { tests := []struct { name string diff --git a/vendor/modules.txt b/vendor/modules.txt index 449399099ea..11b70897c2c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -814,6 +814,7 @@ github.com/satori/go.uuid # github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 github.com/sean-/seed # github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e +## explicit github.com/segmentio/fasthash/fnv1a # github.com/sercand/kuberesolver v2.4.0+incompatible => github.com/sercand/kuberesolver v2.4.0+incompatible github.com/sercand/kuberesolver