Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [CHANGE] slo: include request cancellations within SLO [#4355] (https://github.com/grafana/tempo/pull/4355) (@electron0zero)
request cancellations are exposed under `result` label in `tempo_query_frontend_queries_total` and `tempo_query_frontend_queries_within_slo_total` with `completed` or `canceled` values to differentiate between completed and canceled requests.
* [CHANGE] update default config values to better align with production workloads [#4340](https://github.com/grafana/tempo/pull/4340) (@electron0zero)
* [CHANGE] fix deprecation warning by switching to DoBatchWithOptions [#4343](https://github.com/grafana/tempo/pull/4343) (@dastrobu)
* [CHANGE] **BREAKING CHANGE** The Tempo serverless is now deprecated and will be removed in an upcoming release [#4017](https://github.com/grafana/tempo/pull/4017/) @electron0zero
Expand Down
7 changes: 6 additions & 1 deletion example/docker-compose/local/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ query_frontend:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 100ms
metrics:
max_duration: 120h # maximum duration of a metrics query, increase for local setups
query_backend_after: 5m
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09

distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
Expand All @@ -43,7 +48,7 @@ ingester:

compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes
block_retention: 24h # overall Tempo trace retention. set for demo purposes

metrics_generator:
registry:
Expand Down
8 changes: 7 additions & 1 deletion modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/util"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -125,6 +126,8 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {

// HTTPFinal, GRPCComplete, and GRPCDiff are all responsible for returning something
// usable in grpc streaming/http response.
// NOTE: returning error is reserved for unexpected errors, HTTP errors will be returned
// in the response body. callers should check the http status code.
func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -217,6 +220,9 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody)
case http.StatusBadRequest:
grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody)
case util.StatusClientClosedRequest:
// HTTP 499 is mapped to codes.Canceled grpc error
grpcErr = status.Error(codes.Canceled, c.httpRespBody)
default:
if c.httpStatusCode/100 == 5 {
grpcErr = status.Error(codes.Internal, c.httpRespBody)
Expand All @@ -226,7 +232,7 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
}
httpResp := &http.Response{
StatusCode: c.httpStatusCode,
Status: http.StatusText(c.httpStatusCode),
Status: util.StatusText(c.httpStatusCode),
Body: io.NopCloser(strings.NewReader(c.httpRespBody)),
}

Expand Down
14 changes: 13 additions & 1 deletion modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
)
Expand Down Expand Up @@ -55,6 +56,17 @@ func TestErroredResponse(t *testing.T) {
},
expectedErr: status.Error(codes.InvalidArgument, "foo"),
},
{
name: "499",
statusCode: util.StatusClientClosedRequest,
respBody: "foo",
expectedResp: &http.Response{
StatusCode: util.StatusClientClosedRequest,
Status: util.StatusTextClientClosedRequest,
Body: io.NopCloser(strings.NewReader("foo")),
},
expectedErr: status.Error(codes.Canceled, "foo"),
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -219,7 +231,7 @@ func newTestResponse(t *testing.T) *testPipelineResponse {
func newFailedTestResponse() *testPipelineResponse {
rec := httptest.NewRecorder()
rec.WriteHeader(http.StatusInternalServerError)

_, _ = rec.Write([]byte("foo"))
return &testPipelineResponse{
r: rec.Result(),
}
Expand Down
21 changes: 8 additions & 13 deletions modules/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/tempo/pkg/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -22,16 +24,14 @@ import (
)

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
// nil response in ServeHTTP
NilResponseError = "nil resp in ServeHTTP"
)

var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
errCanceled = httpgrpc.Error(util.StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Error(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Error(http.StatusRequestEntityTooLarge, "http: request body too large")
)

// handler exists to wrap a roundtripper with an HTTP handler. It wraps all
Expand Down Expand Up @@ -88,7 +88,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logMessage = append(
logMessage,
"status", statusCode,
"err", err.Error(),
"error", err.Error(),
"response_size", 0,
)
level.Info(f.logger).Log(logMessage...)
Expand Down Expand Up @@ -157,18 +157,13 @@ func copyHeader(dst, src http.Header) {
// httpgrpc errors can bubble up to here and should be translated to http errors. It returns
// httpgrpc error.
func writeError(w http.ResponseWriter, err error) error {
if errors.Is(err, context.Canceled) {
if grpcutil.IsCanceled(err) {
err = errCanceled
} else if errors.Is(err, context.DeadlineExceeded) {
err = errDeadlineExceeded
} else if isRequestBodyTooLarge(err) {
} else if util.IsRequestBodyTooLarge(err) {
err = errRequestEntityTooLarge
}
httpgrpc.WriteError(w, err)
return err
}

// isRequestBodyTooLarge returns true if the error is "http: request body too large".
func isRequestBodyTooLarge(err error) bool {
return err != nil && strings.Contains(err.Error(), "http: request body too large")
}
32 changes: 28 additions & 4 deletions modules/frontend/pipeline/collector_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"errors"
"net/http"
"time"

Expand All @@ -27,24 +28,33 @@ func NewGRPCCollector[T combiner.TResponse](next AsyncRoundTripper[combiner.Pipe
}
}

// Handle
// RoundTrip implements the http.RoundTripper interface
func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {
ctx := req.Context()
ctx, cancel := context.WithCancel(ctx) // create a new context with a cancel function
defer cancel()

ctx, span := tracer.Start(ctx, "GRPCCollector.RoundTrip")
defer span.End()

req = req.WithContext(ctx)
resps, err := c.next.RoundTrip(NewHTTPRequest(req))
if err != nil {
return grpcError(err)
}
span.AddEvent("next.RoundTrip done")

lastUpdate := time.Now()

err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, func() error {
// sendDiffCb should return an error if the context is cancelled,
// callback's error is used to exit early from the loop and return the error to the caller
sendDiffCb := func() error {
// check if we should send an update
if time.Since(lastUpdate) > 500*time.Millisecond {
lastUpdate = time.Now()
// check and return the context errors, like ctx cancelled, etc
if req.Context().Err() != nil {
return req.Context().Err()
}

// send a diff only during streaming
resp, err := c.combiner.GRPCDiff()
Expand All @@ -58,16 +68,24 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {
}

return nil
})
}

err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, sendDiffCb)
if err != nil {
return grpcError(err)
}
span.AddEvent("consumeAndCombineResponses done")

// send the final diff if there is anything left
resp, err := c.combiner.GRPCDiff()
if err != nil {
return grpcError(err)
}
span.AddEvent("final combiner.GRPCDiff() done")
// check and return the context errors, like ctx cancelled, etc
if req.Context().Err() != nil {
return grpcError(req.Context().Err())
}
err = c.send(resp)
if err != nil {
return grpcError(err)
Expand All @@ -83,5 +101,11 @@ func grpcError(err error) error {
return err
}

// if this is context cancelled, we return a grpc cancelled error
if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, err.Error())
}

// rest all fall into internal server error
return status.Error(codes.Internal, err.Error())
}
21 changes: 19 additions & 2 deletions modules/frontend/pipeline/collector_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,39 @@ func NewHTTPCollector(next AsyncRoundTripper[combiner.PipelineResponse], consume
}
}

// Handle
// RoundTrip implements the http.RoundTripper interface
func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
ctx, span := tracer.Start(ctx, "httpCollector.RoundTrip")
defer span.End()

req = req.WithContext(ctx)

resps, err := r.next.RoundTrip(NewHTTPRequest(req))
if err != nil {
return nil, err
}
span.AddEvent("next.RoundTrip done")

err = consumeAndCombineResponses(ctx, r.consumers, resps, r.combiner, nil)
if err != nil {
return nil, err
}
return r.combiner.HTTPFinal()
span.AddEvent("consumeAndCombineResponses done")

resp, err := r.combiner.HTTPFinal()
if err != nil {
return nil, err
}
span.AddEvent("combiner.HTTPFinal done")

// we don't get context cancellation errors from the HTTPFinal,
// so we need to check, and return to downstream callers
if req.Context().Err() != nil {
return nil, req.Context().Err()
}
return resp, err
}

func consumeAndCombineResponses(ctx context.Context, consumers int, resps Responses[combiner.PipelineResponse], c combiner.Combiner, callback func() error) error {
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) {
}()
grpcReq := &tempopb.SearchRequest{}
err := f.streamingSearch(grpcReq, srv)
require.Equal(t, status.Error(codes.Internal, "context canceled"), err)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

func TestSearchLimitHonored(t *testing.T) {
Expand Down
54 changes: 42 additions & 12 deletions modules/frontend/slos.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/tempo/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
Expand All @@ -15,6 +17,9 @@ const (
searchOp = "search"
metadataOp = "metadata"
metricsOp = "metrics"

resultCompleted = "completed"
resultCanceled = "canceled"
)

var (
Expand All @@ -25,7 +30,7 @@ var (
Namespace: "tempo",
Name: "query_frontend_queries_within_slo_total",
Help: "Total Queries within SLO per tenant",
}, []string{"tenant", "op"})
}, []string{"tenant", "op", "result"})

sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
Expand All @@ -39,7 +44,7 @@ var (
Namespace: "tempo",
Name: "query_frontend_queries_total",
Help: "Total queries received per tenant.",
}, []string{"tenant", "op"})
}, []string{"tenant", "op", "result"})

traceByIDCounter = queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchCounter = queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
Expand Down Expand Up @@ -84,21 +89,46 @@ func metricsSLOPostHook(cfg SLOConfig) handlerPostHook {

func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec, throughputVec prometheus.ObserverVec, cfg SLOConfig) handlerPostHook {
return func(resp *http.Response, tenant string, bytesProcessed uint64, latency time.Duration, err error) {
// first record all queries
allByTenantCounter.WithLabelValues(tenant).Inc()

// most errors are SLO violations
// most errors are SLO violations but we have few exceptions.
if err != nil {
// however, if this is a grpc resource exhausted error (429) or invalid argument (400) then we are within SLO
// However, gRPC resource exhausted error (429), invalid argument (400), not found (404) and
// request cancellations are considered within the SLO.
switch status.Code(err) {
case codes.ResourceExhausted,
codes.InvalidArgument,
codes.NotFound:
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
case codes.ResourceExhausted, codes.InvalidArgument, codes.NotFound:
allByTenantCounter.WithLabelValues(tenant, resultCompleted).Inc()
withinSLOByTenantCounter.WithLabelValues(tenant, resultCompleted).Inc()
return
}

if grpcutil.IsCanceled(err) {
allByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
withinSLOByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
return
}

// check for the response and 499 in the status code, can come from http pipeline along with error
if resp != nil && resp.StatusCode == util.StatusClientClosedRequest {
allByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
withinSLOByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
return
}

// in case we have error, that doesn't fall into the above categories, it's a SLO violation
// so only increment the allByTenantCounter
allByTenantCounter.WithLabelValues(tenant, resultCompleted).Inc()
return
}

// we don't always get error in case of http pipeline, check for 499 status code
if resp != nil && resp.StatusCode == util.StatusClientClosedRequest {
allByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
withinSLOByTenantCounter.WithLabelValues(tenant, resultCanceled).Inc()
return
}

// record all queries
allByTenantCounter.WithLabelValues(tenant, resultCompleted).Inc()

// all 200s/300s/400s are success
if resp != nil && resp.StatusCode >= 500 {
return
Expand Down Expand Up @@ -131,6 +161,6 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec
return
}

withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
withinSLOByTenantCounter.WithLabelValues(tenant, resultCompleted).Inc()
}
}
Loading