Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,16 @@ func queryBlock(ctx context.Context, r backend.Reader, _ backend.Compactor, bloc
searchOpts := common.SearchOptions{}
tempodb.SearchConfig{}.ApplyToOptions(&searchOpts)

trace, err := block.FindTraceByID(ctx, traceID, searchOpts)
res, err := block.FindTraceByID(ctx, traceID, searchOpts)
if err != nil {
return nil, err
}

if trace == nil {
if res == nil || res.Trace == nil {
return nil, nil
}

return &queryResults{
blockID: id,
trace: trace,
trace: res.Trace,
}, nil
}
31 changes: 23 additions & 8 deletions modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"sync"

"github.com/grafana/tempo/pkg/collector"

"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
tempo_io "github.com/grafana/tempo/pkg/io"
Expand All @@ -20,14 +22,16 @@ const (
internalErrorMsg = "internal error"
)

type traceByIDCombiner struct {
type TraceByIDCombiner struct {
mu sync.Mutex

c *trace.Combiner
contentType string

code int
statusMessage string

mc *collector.MetricsCollector
Comment thread
joe-elliott marked this conversation as resolved.
}

// NewTraceByID returns a trace id combiner. The trace by id combiner has a few different behaviors then the others
Expand All @@ -36,14 +40,23 @@ type traceByIDCombiner struct {
// - runs the zipkin dedupe logic on the fully combined trace
// - encode the returned trace as either json or proto depending on the request
func NewTraceByID(maxBytes int, contentType string) Combiner {
return &traceByIDCombiner{
return &TraceByIDCombiner{
c: trace.NewCombiner(maxBytes, false),
code: http.StatusNotFound,
contentType: contentType,
mc: collector.NewMetricsCollector(),
}
}

func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {
func NewTypedTraceByID(maxBytes int, contentType string) *TraceByIDCombiner {
return NewTraceByID(maxBytes, contentType).(*TraceByIDCombiner)
}

func (c *TraceByIDCombiner) TotalBytesProcessed() uint64 {
return c.mc.TotalValue()
}

func (c *TraceByIDCombiner) AddResponse(r PipelineResponse) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -85,17 +98,19 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {

// Consume the trace
_, err = c.c.Consume(resp.Trace)

if errors.Is(err, trace.ErrTraceTooLarge) {
c.code = http.StatusUnprocessableEntity
c.statusMessage = fmt.Sprint(err)
return nil
}
if resp.Metrics != nil {
c.mc.Add(resp.Metrics.InspectedBytes)
}

return err
}

func (c *traceByIDCombiner) HTTPFinal() (*http.Response, error) {
func (c *TraceByIDCombiner) HTTPFinal() (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -142,20 +157,20 @@ func (c *traceByIDCombiner) HTTPFinal() (*http.Response, error) {
}, nil
}

func (c *traceByIDCombiner) StatusCode() int {
func (c *TraceByIDCombiner) StatusCode() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.code
}

// ShouldQuit returns true if the response should be returned early.
func (c *traceByIDCombiner) ShouldQuit() bool {
func (c *TraceByIDCombiner) ShouldQuit() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.shouldQuit()
}

func (c *traceByIDCombiner) shouldQuit() bool {
func (c *TraceByIDCombiner) shouldQuit() bool {
if c.code/100 == 5 { // Bail on 5xx
return true
}
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/combiner/trace_by_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestTraceByIDHonorsContentType(t *testing.T) {

// json
c := NewTraceByID(0, api.HeaderAcceptJSON)
err := c.AddResponse(toHTTPProtoResponse(t, &tempopb.TraceByIDResponse{Trace: expected}, 200))
err := c.AddResponse(toHTTPProtoResponse(t, &tempopb.TraceByIDResponse{Trace: expected, Metrics: &tempopb.TraceByIDMetrics{InspectedBytes: 100}}, 200))
require.NoError(t, err)

resp, err := c.HTTPFinal()
Expand All @@ -78,7 +78,7 @@ func TestTraceByIDHonorsContentType(t *testing.T) {

// proto
c = NewTraceByID(0, api.HeaderAcceptProtobuf)
err = c.AddResponse(toHTTPProtoResponse(t, &tempopb.TraceByIDResponse{Trace: expected}, 200))
err = c.AddResponse(toHTTPProtoResponse(t, &tempopb.TraceByIDResponse{Trace: expected, Metrics: &tempopb.TraceByIDMetrics{InspectedBytes: 100}}, 200))
require.NoError(t, err)

resp, err = c.HTTPFinal()
Expand Down
9 changes: 9 additions & 0 deletions modules/frontend/combiner/trace_by_id_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

func NewTypedTraceByIDV2(maxBytes int, marshalingFormat string) GRPCCombiner[*tempopb.TraceByIDResponse] {
return NewTraceByIDV2(maxBytes, marshalingFormat).(GRPCCombiner[*tempopb.TraceByIDResponse])
}

func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
combiner := trace.NewCombiner(maxBytes, true)
var partialTrace bool
var inspectedBytes uint64
Comment thread
joe-elliott marked this conversation as resolved.
gc := &genericCombiner[*tempopb.TraceByIDResponse]{
combine: func(partial *tempopb.TraceByIDResponse, _ *tempopb.TraceByIDResponse, _ PipelineResponse) error {
if partial.Status == tempopb.TraceByIDResponse_PARTIAL {
partialTrace = true
}
if partial.Metrics != nil {
inspectedBytes += partial.Metrics.InspectedBytes
}
_, err := combiner.Consume(partial.Trace)
return err
},
Expand All @@ -28,6 +36,7 @@ func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
deduper := newDeduper()
traceResult = deduper.dedupe(traceResult)
resp.Trace = traceResult
resp.Metrics = &tempopb.TraceByIDMetrics{InspectedBytes: inspectedBytes}

if partialTrace || combiner.IsPartialTrace() {
resp.Status = tempopb.TraceByIDResponse_PARTIAL
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByID, logger)
tracesV2 := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByIDV2, logger)
traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTypedTraceByID, logger)
tracesV2 := newTraceIDV2Handler(cfg, tracePipeline, o, combiner.NewTypedTraceByIDV2, logger)
search := newSearchHTTPHandler(cfg, searchPipeline, logger)
searchTags := newTagsHTTPHandler(cfg, searchTagsPipeline, o, logger)
searchTagsV2 := newTagsV2HTTPHandler(cfg, searchTagsPipeline, o, logger)
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *mockReader) FetchTagValues(context.Context, *backend.BlockMeta, traceql
return nil
}

func (m *mockReader) Find(context.Context, string, common.ID, string, string, int64, int64, common.SearchOptions) ([]*tempopb.Trace, []error, error) {
func (m *mockReader) Find(context.Context, string, common.ID, string, string, int64, int64, common.SearchOptions) ([]*tempopb.TraceByIDResponse, []error, error) {
return nil, nil, nil
}

Expand Down
9 changes: 5 additions & 4 deletions modules/frontend/slos.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ var (
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"tenant", "op"})

searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})
metadataThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metadataOp})
metricsThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metricsOp})
traceByIDThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})
metadataThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metadataOp})
metricsThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metricsOp})
)

type (
Expand All @@ -72,7 +73,7 @@ type (

// todo: remove post hooks and implement as a handler
func traceByIDSLOPostHook(cfg SLOConfig) handlerPostHook {
return sloHook(traceByIDCounter, sloTraceByIDCounter, nil, cfg)
return sloHook(traceByIDCounter, sloTraceByIDCounter, traceByIDThroughput, cfg)
}

func searchSLOPostHook(cfg SLOConfig) handlerPostHook {
Expand Down
82 changes: 80 additions & 2 deletions modules/frontend/traceid_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/grafana/tempo/pkg/tempopb"

"github.com/go-kit/log"
"github.com/go-kit/log/level" //nolint:all //deprecated
"github.com/grafana/dskit/user"
Expand All @@ -16,7 +18,7 @@ import (
)

// newTraceIDHandler creates a http.handler for trace by id requests
func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, combinerFn func(int, string) combiner.Combiner, logger log.Logger) http.RoundTripper {
func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, combinerFn func(int, string) *combiner.TraceByIDCombiner, logger log.Logger) http.RoundTripper {
postSLOHook := traceByIDSLOPostHook(cfg.TraceByID.SLO)

return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand Down Expand Up @@ -71,7 +73,83 @@ func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.Pipe
resp, err := rt.RoundTrip(req)
elapsed := time.Since(start)

postSLOHook(resp, tenant, 0, elapsed, err)
bytesProcessed := comb.TotalBytesProcessed()
postSLOHook(resp, tenant, bytesProcessed, elapsed, err)

level.Info(logger).Log(
"msg", "trace id response",
"tenant", tenant,
"path", req.URL.Path,
"duration_seconds", elapsed.Seconds(),
"err", err)

return resp, err
})
}

// newTraceIDV2Handler creates a http.handler for trace by id requests
func newTraceIDV2Handler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, combinerFn func(int, string) combiner.GRPCCombiner[*tempopb.TraceByIDResponse], logger log.Logger) http.RoundTripper {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split V1 and V2 handlers as they use different combiners and collect metrics in different ways.

postSLOHook := traceByIDSLOPostHook(cfg.TraceByID.SLO)

return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
tenant, err := user.ExtractOrgID(req.Context())
if err != nil {
level.Error(logger).Log("msg", "trace id: failed to extract tenant id", "err", err)
return &http.Response{
StatusCode: http.StatusBadRequest,
Status: http.StatusText(http.StatusBadRequest),
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}

// validate traceID
_, err = api.ParseTraceID(req)
if err != nil {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(err.Error())),
Header: http.Header{},
}, nil
}

// validate start and end parameter
_, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(req)
if reqErr != nil {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(reqErr.Error())),
Header: http.Header{},
}, nil
}

// check marshalling format
marshallingFormat := api.HeaderAcceptJSON
if req.Header.Get(api.HeaderAccept) == api.HeaderAcceptProtobuf {
marshallingFormat = api.HeaderAcceptProtobuf
}

// enforce all communication internal to Tempo to be in protobuf bytes
req.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)

level.Info(logger).Log(
"msg", "trace id request",
"tenant", tenant,
"path", req.URL.Path)

comb := combinerFn(o.MaxBytesPerTrace(tenant), marshallingFormat)
rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb)

start := time.Now()
resp, err := rt.RoundTrip(req)
elapsed := time.Since(start)

var bytesProcessed uint64
findResp, _ := comb.GRPCFinal()
Comment thread
joe-elliott marked this conversation as resolved.
if findResp != nil && findResp.Metrics != nil {
bytesProcessed = findResp.Metrics.InspectedBytes
}

postSLOHook(resp, tenant, bytesProcessed, elapsed, err)

level.Info(logger).Log(
"msg", "trace id response",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ type mockBlock struct {
meta *backend.BlockMeta
}

func (m *mockBlock) FindTraceByID(context.Context, common.ID, common.SearchOptions) (*tempopb.Trace, error) {
func (m *mockBlock) FindTraceByID(context.Context, common.ID, common.SearchOptions) (*tempopb.TraceByIDResponse, error) {
return nil, nil
}

Expand Down
8 changes: 2 additions & 6 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,12 @@ func (i *Ingester) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequ
return &tempopb.TraceByIDResponse{}, nil
}

trace, err := inst.FindTraceByID(ctx, req.TraceID, req.AllowPartialTrace)
res, err = inst.FindTraceByID(ctx, req.TraceID, req.AllowPartialTrace)
if err != nil {
return nil, err
}

span.AddEvent("trace found", oteltrace.WithAttributes(attribute.Bool("found", trace != nil)))

res = &tempopb.TraceByIDResponse{
Trace: trace,
}
span.AddEvent("trace found", oteltrace.WithAttributes(attribute.Bool("found", res != nil && res.Trace != nil)))

return res, nil
}
Expand Down
Loading