Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ac1ba9d
wip: convert to passing premarshalled traces
joe-elliott May 5, 2021
ec25a4d
restored tests
joe-elliott May 5, 2021
f7274e6
receive Batches in ingester
joe-elliott May 5, 2021
d4d6b4b
ingester tests
joe-elliott May 5, 2021
f84a1f2
removed unused struct field
joe-elliott May 5, 2021
fb1dc53
added ids to push request
joe-elliott May 5, 2021
50c79cf
first pass v1 dataEncoding
joe-elliott May 6, 2021
454c8ff
perf improvements
joe-elliott May 6, 2021
fc90be9
made some assumptions
joe-elliott May 6, 2021
4595611
jpes
joe-elliott May 6, 2021
abc8154
hid marshal
joe-elliott May 6, 2021
afbc2fa
batches => traces
joe-elliott May 6, 2021
a657502
Added sorting and tests
joe-elliott May 10, 2021
43a0a82
Added tess
joe-elliott May 10, 2021
9b3d2e9
Added cross encoding combine tests
joe-elliott May 10, 2021
abc31f1
Additional tests/fixes
joe-elliott May 11, 2021
3ccd282
improved instance tests
joe-elliott May 11, 2021
fc6269d
instance tests
joe-elliott May 11, 2021
046c997
improved err msgs
joe-elliott May 11, 2021
21e0bfd
fix slice reuse
joe-elliott May 11, 2021
90804f2
improved test
joe-elliott May 11, 2021
31fd203
improved query testing
joe-elliott May 11, 2021
c0bec48
Merge branch 'main' into ingester-no-alloc
joe-elliott May 11, 2021
a5ac595
lint/context fix
joe-elliott May 12, 2021
867e7b8
Logs/tests
joe-elliott May 12, 2021
dfcedbb
Stop reusing trace ids!
joe-elliott May 12, 2021
6c4ca9b
Removed id from trace
joe-elliott May 12, 2021
2723477
improved comment
joe-elliott May 12, 2021
c3b2b61
changelog
joe-elliott May 12, 2021
f0f0207
go.mod
joe-elliott May 12, 2021
dd48061
simplified SortTraceBytes
joe-elliott May 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679)
* [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688)
* [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687)
* [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694)
This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans.
* [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690)

## v0.7.0
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.20.0
github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
Expand Down
83 changes: 49 additions & 34 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package distributor
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -13,6 +12,7 @@ import (
cortex_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/gogo/status"
"github.com/segmentio/fasthash/fnv1a"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -235,30 +235,29 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp
req.Size())
}

keys, traces, err := requestsByTraceID(req, userID, spanCount)
keys, traces, ids, err := requestsByTraceID(req, userID, spanCount)
if err != nil {
metricDiscardedSpans.WithLabelValues(reasonInternalError, userID).Add(float64(spanCount))
return nil, err
}

err = d.sendToIngestersViaBytes(ctx, userID, traces, keys)
err = d.sendToIngestersViaBytes(ctx, userID, traces, keys, ids)
if err != nil {
recordDiscaredSpans(err, userID, spanCount)
}

return nil, err // PushRequest is ignored, so no reason to create one
}

func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*tempopb.PushRequest, keys []uint32) error {

func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*tempopb.Trace, keys []uint32, ids [][]byte) error {
// Marshal to bytes once
rawRequests := make([][]byte, len(traces))
marshalledTraces := make([][]byte, len(traces))
for i, t := range traces {
b, err := t.Marshal()
if err != nil {
return errors.Wrap(err, "failed to marshal PushRequest")
}
rawRequests[i] = b
marshalledTraces[i] = b
}

op := ring.WriteNoExtend
Expand All @@ -267,17 +266,18 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string
}

err := ring.DoBatch(ctx, op, d.ingestersRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {

localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)

req := tempopb.PushBytesRequest{
Requests: make([]tempopb.PreallocRequest, len(indexes)),
Traces: make([]tempopb.PreallocBytes, len(indexes)),
Ids: make([]tempopb.PreallocBytes, len(indexes)),
}

for i, j := range indexes {
req.Requests[i].Request = rawRequests[j][0:]
req.Traces[i].Slice = marshalledTraces[j][0:]
req.Ids[i].Slice = ids[j]
}

c, err := d.pool.GetClientFor(ingester.Addr)
Expand Down Expand Up @@ -306,29 +306,37 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ([]uint32, []*tempopb.PushRequest, error) {
const expectedTracesPerBatch = 10 // roughly what we're seeing through metrics
expectedSpansPerTrace := spanCount / expectedTracesPerBatch
// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
// and traces to pass onto the ingesters.
func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ([]uint32, []*tempopb.Trace, [][]byte, error) {
type traceAndID struct {
id []byte
trace *tempopb.Trace
}

requestsByTrace := make(map[uint32]*tempopb.PushRequest)
spansByILS := make(map[string]*v1.InstrumentationLibrarySpans)
const tracesPerBatch = 20 // p50 of internal env
tracesByID := make(map[uint32]*traceAndID, tracesPerBatch)
spansByILS := make(map[uint32]*v1.InstrumentationLibrarySpans)

for _, ils := range req.Batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
if !validation.ValidTraceID(span.TraceId) {
return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit")
traceID := span.TraceId
if !validation.ValidTraceID(traceID) {
return nil, nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit")
}

traceKey := util.TokenFor(userID, span.TraceId)
ilsKey := strconv.Itoa(int(traceKey))
traceKey := util.TokenFor(userID, traceID)
ilsKey := traceKey
if ils.InstrumentationLibrary != nil {
ilsKey = ilsKey + ils.InstrumentationLibrary.Name + ils.InstrumentationLibrary.Version
ilsKey = fnv1a.AddString32(ilsKey, ils.InstrumentationLibrary.Name)
ilsKey = fnv1a.AddString32(ilsKey, ils.InstrumentationLibrary.Version)
}

existingILS, ok := spansByILS[ilsKey]
if !ok {
existingILS = &v1.InstrumentationLibrarySpans{
InstrumentationLibrary: ils.InstrumentationLibrary,
Spans: make([]*v1.Span, 0, expectedSpansPerTrace),
Spans: make([]*v1.Span, 0, spanCount/tracesPerBatch),
}
spansByILS[ilsKey] = existingILS
}
Expand All @@ -339,31 +347,38 @@ func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) (
continue
}

existingReq, ok := requestsByTrace[traceKey]
existingTrace, ok := tracesByID[traceKey]
if !ok {
existingReq = &tempopb.PushRequest{
Batch: &v1.ResourceSpans{
InstrumentationLibrarySpans: make([]*v1.InstrumentationLibrarySpans, 0, len(req.Batch.InstrumentationLibrarySpans)), // assume most spans belong to the same trace
Resource: req.Batch.Resource,
existingTrace = &traceAndID{
id: traceID,
trace: &tempopb.Trace{
Batches: make([]*v1.ResourceSpans, 0, spanCount/tracesPerBatch),
},
}
requestsByTrace[traceKey] = existingReq

tracesByID[traceKey] = existingTrace
}
existingReq.Batch.InstrumentationLibrarySpans = append(existingReq.Batch.InstrumentationLibrarySpans, existingILS)

existingTrace.trace.Batches = append(existingTrace.trace.Batches, &v1.ResourceSpans{
Resource: req.Batch.Resource,
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{existingILS},
})
}
}

metricTracesPerBatch.Observe(float64(len(requestsByTrace)))
metricTracesPerBatch.Observe(float64(len(tracesByID)))

keys := make([]uint32, 0, len(requestsByTrace))
pushRequests := make([]*tempopb.PushRequest, 0, len(requestsByTrace))
keys := make([]uint32, 0, len(tracesByID))
traces := make([]*tempopb.Trace, 0, len(tracesByID))
ids := make([][]byte, 0, len(tracesByID))

for k, r := range requestsByTrace {
for k, r := range tracesByID {
keys = append(keys, k)
pushRequests = append(pushRequests, r)
traces = append(traces, r.trace)
ids = append(ids, r.id)
}

return keys, pushRequests, nil
return keys, traces, ids, nil
}

func recordDiscaredSpans(err error, userID string, spanCount int) {
Expand Down
Loading