Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* [FEATURE] Add entity-based limiting mode for metrics-generator as an alternative to series-based limiting. [#5788](https://github.com/grafana/tempo/pull/5788) (@Logiraptor)
* [FEATURE] Add `tempo_metrics_generator_registry_active_series_demand_estimate` that estimates metrics-generator active series demand even when the active series limit is reached [#5710](https://github.com/grafana/tempo/pull/5710) (@carles-grafana)
* [FEATURE] Metrics generator will now produce overflow series to capture new data once limits are hit. These series have the label `metric_overflow="true"`. [#5954](https://github.com/grafana/tempo/pull/5954) (@Logiraptor)
* [FEATURE] Add support for external storage to trace by id endpoint [#6185](https://github.com/grafana/tempo/pull/6185) (@Logiraptor)
* [ENHANCEMENT] add database_name_attributes config to service graph processor [#5398](https://github.com/grafana/tempo/pull/5398) (@KyriosGN0)
* [ENHANCEMENT] Added validation mode and tests for tempo-vulture [#5605](https://github.com/grafana/tempo/pull/5605) (@davidham)
* [ENHANCEMENT] Add SSE-C encryption support to S3 backend [#5789](https://github.com/grafana/tempo/pull/5789) (@steffsas)
Expand Down
19 changes: 19 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,11 @@ query_frontend:
# (default: 0)
[concurrent_shards: <int>]

# Enable external trace source for trace-by-ID queries. When enabled,
# the frontend will create an additional shard to query the external endpoint
# configured in the querier.
[external_enabled: <bool> | default = false]

# If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
# NOTE: Requires `duration_slo` AND `throughput_bytes_slo` to be configured.
Expand Down Expand Up @@ -900,6 +905,20 @@ querier:
# Timeout for trace lookup requests
[query_timeout: <duration> | default = 10s]

# External trace source configuration. When enabled, trace-by-ID queries
# will also fetch trace data from an external HTTP endpoint that implements
# Tempo's TraceIDV2 API (/api/v2/traces/{traceID}).
external:
# Enable querying an external endpoint for trace data.
[enabled: <bool> | default = false]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the enabled flag in here as well? I see that we have external_enabled config under the frontend section as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not strictly necessary. Is this what you had in mind? 7d1c116


# The URL of the external service that implements the TraceIDV2 API.
# Example: "http://external-service:3200"
[endpoint: <string>]

# Timeout for requests to the external endpoint.
[timeout: <duration> | default = 10s]

search:
# Timeout for search requests
[query_timeout: <duration> | default = 30s]
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ querier:
query_timeout: 30s
trace_by_id:
query_timeout: 10s
external:
enabled: false
endpoint: ""
timeout: 10s
metrics:
concurrent_blocks: 2
time_overlap_cutoff: 0.2
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type TraceByIDConfig struct {
QueryShards int `yaml:"query_shards,omitempty"`
ConcurrentShards int `yaml:"concurrent_shards,omitempty"`
SLO SLOConfig `yaml:",inline"`
ExternalEnabled bool `yaml:"external_enabled,omitempty"`

// RF1After specifies the time after which RF1 logic is applied, injected by the configuration
// or determined at runtime based on search request parameters.
Expand Down
35 changes: 29 additions & 6 deletions modules/frontend/traceid_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@

func newAsyncTraceIDSharder(cfg *TraceByIDConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
// Calculate block boundaries:
// - If external is enabled: N-2 block shards (1 ingester + 1 external + N-2 blocks = N total)
// - If external is disabled: N-1 block shards (1 ingester + N-1 blocks = N total)
numBlockShards := cfg.QueryShards - 1
if cfg.ExternalEnabled {
numBlockShards = cfg.QueryShards - 2
}

Check notice on line 37 in modules/frontend/traceid_sharder.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 36-37 are not covered by tests
return asyncTraceSharder{
next: next,
cfg: cfg,
logger: logger,
blockBoundaries: blockboundary.CreateBlockBoundaries(cfg.QueryShards - 1), // one shard will be used to query ingesters
blockBoundaries: blockboundary.CreateBlockBoundaries(numBlockShards),
}
})
}
Expand Down Expand Up @@ -76,16 +83,18 @@
return nil, err
}

reqs := make([]pipeline.Request, s.cfg.QueryShards)
reqs := make([]pipeline.Request, 0, s.cfg.QueryShards)
params := map[string]string{}

reqs[0], err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
// Job 0: ingester job
req, err := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
params[querier.QueryModeKey] = querier.QueryModeIngesters
return api.BuildQueryRequest(r, params), nil
})
if err != nil {
return nil, err
}
reqs = append(reqs, req)

var rf1After string
if val := parent.HTTPRequest().URL.Query().Get(api.URLParamRF1After); val != "" {
Expand All @@ -94,7 +103,22 @@
rf1After = s.cfg.RF1After.Format(time.RFC3339)
}

// build sharded block queries
// Job 1: external job (if enabled)
if s.cfg.ExternalEnabled {
req, err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
params[querier.QueryModeKey] = querier.QueryModeExternal
return api.BuildQueryRequest(r, params), nil
})
if err != nil {
return nil, err
}

Check notice on line 114 in modules/frontend/traceid_sharder.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 113-114 are not covered by tests
reqs = append(reqs, req)
}

// Jobs 2 to N-1: block queries
// When external is enabled, we have N-2 block shards
// When external is disabled, we have N-1 block shards
// blockBoundaries has length equal to numBlockShards, and we create shards between boundaries
for i := 1; i < len(s.blockBoundaries); i++ {
i := i // save the loop variable locally to make sure the closure grabs the correct var.
pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
Expand All @@ -106,8 +130,7 @@

return api.BuildQueryRequest(r, params), nil
})

reqs[i] = pipelineR
reqs = append(reqs, pipelineR)
}

return reqs, nil
Expand Down
29 changes: 29 additions & 0 deletions modules/frontend/traceid_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,32 @@ func TestBuildShardedRequests(t *testing.T) {
require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].HTTPRequest().RequestURI)
urisEqual(t, []string{"/querier?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks"}, []string{shardedReqs[1].HTTPRequest().RequestURI})
}

func TestBuildShardedRequestsWithExternal(t *testing.T) {
queryShards := 4

sharder := &asyncTraceSharder{
cfg: &TraceByIDConfig{
QueryShards: queryShards,
ExternalEnabled: true,
},
blockBoundaries: blockboundary.CreateBlockBoundaries(queryShards - 2),
}

ctx := user.InjectOrgID(context.Background(), "blerg")
req := httptest.NewRequest("GET", "/", nil).WithContext(ctx)

shardedReqs, err := sharder.buildShardedRequests(pipeline.NewHTTPRequest(req))
require.NoError(t, err)
require.Len(t, shardedReqs, queryShards)

require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].HTTPRequest().RequestURI)
require.Equal(t, "/querier?mode=external", shardedReqs[1].HTTPRequest().RequestURI)

// Verify block shard requests
for i := 2; i < queryShards; i++ {
require.Contains(t, shardedReqs[i].HTTPRequest().RequestURI, "mode=blocks")
require.Contains(t, shardedReqs[i].HTTPRequest().RequestURI, "blockStart")
require.Contains(t, shardedReqs[i].HTTPRequest().RequestURI, "blockEnd")
}
}
11 changes: 10 additions & 1 deletion modules/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
}

type TraceByIDConfig struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
QueryTimeout time.Duration `yaml:"query_timeout"`
External ExternalConfig `yaml:"external"`
}

type MetricsConfig struct {
Expand All @@ -53,9 +54,17 @@
PreferredZone string `yaml:"preferred_zone,omitempty"`
}

type ExternalConfig struct {
Enabled bool `yaml:"enabled"`
Endpoint string `yaml:"endpoint"` // e.g., "http://external-service:3200"
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlagsAndApplyDefaults register flags.
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.TraceByID.QueryTimeout = 10 * time.Second
cfg.TraceByID.External.Enabled = false
cfg.TraceByID.External.Timeout = 10 * time.Second

Check notice on line 67 in modules/querier/config.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 66-67 are not covered by tests
cfg.QueryRelevantIngesters = false
cfg.ExtraQueryDelay = 0
cfg.MaxConcurrentQueries = 20
Expand Down
110 changes: 110 additions & 0 deletions modules/querier/external/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package external

import (
"context"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
)

var metricExternalRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Name: "querier_external_endpoint_request_duration_seconds",
Help: "Duration of requests to the external endpoint in seconds.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"status_code"})

type Client struct {
httpClient *http.Client
externalURL *url.URL
}

func NewClient(endpoint string, timeout time.Duration) (*Client, error) {
externalURL, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid external endpoint URL: %w", err)
}

Check notice on line 42 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 41-42 are not covered by tests

return &Client{
httpClient: &http.Client{
Timeout: timeout,
Transport: otelhttp.NewTransport(http.DefaultTransport),
},
externalURL: externalURL,
}, nil
}

// TraceByID forwards a trace-by-ID request to the external endpoint
Comment thread
Logiraptor marked this conversation as resolved.
Outdated
// traceID is the trace ID to query
// startTime and endTime are Unix timestamps in seconds (0 means not specified)
func (c *Client) TraceByID(ctx context.Context, userID string, traceID []byte, startTime, endTime int64) (*tempopb.TraceByIDResponse, error) {
start := time.Now()
statusCode := "error"
defer func() {
metricExternalRequestDuration.WithLabelValues(statusCode).Observe(time.Since(start).Seconds())
}()

path := c.externalURL.JoinPath(strings.Replace(api.PathTracesV2, "{traceID}", hex.EncodeToString(traceID), 1))

// Add query parameters for start/end times
q := path.Query()
if startTime != 0 {
q.Set("start", strconv.FormatInt(startTime, 10))
}
if endTime != 0 {
q.Set("end", strconv.FormatInt(endTime, 10))
}
path.RawQuery = q.Encode()

httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, path.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create external request: %w", err)
}

Check notice on line 78 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 77-78 are not covered by tests

httpReq.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)
httpReq.Header.Set(user.OrgIDHeaderName, userID)

resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("external endpoint request failed: %w", err)
}

Check notice on line 86 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 85-86 are not covered by tests
defer resp.Body.Close()

// Set the status code for the metric tracking in defer
statusCode = strconv.Itoa(resp.StatusCode)

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read external response body: %w", err)
}

Check notice on line 95 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 94-95 are not covered by tests

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
return nil, fmt.Errorf("external endpoint returned status %d: %s", resp.StatusCode, string(body))
}

Check notice on line 99 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 98-99 are not covered by tests

var trace tempopb.Trace
Comment thread
Logiraptor marked this conversation as resolved.
err = trace.Unmarshal(body)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal external response: %w", err)
}

Check notice on line 105 in modules/querier/external/client.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 104-105 are not covered by tests

return &tempopb.TraceByIDResponse{
Trace: &trace,
}, nil
}
83 changes: 83 additions & 0 deletions modules/querier/external/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package external

import (
"context"
"encoding/hex"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/stretchr/testify/require"
)

func TestClient_TraceByID(t *testing.T) {
traceID := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}
userID := "test-tenant"
expectedPath := "/api/v2/traces/" + hex.EncodeToString(traceID)

// Create a test trace to return
testTrace := &tempopb.Trace{
ResourceSpans: []*v1_trace.ResourceSpans{
{
ScopeSpans: []*v1_trace.ScopeSpans{
{
Spans: []*v1_trace.Span{
{
TraceId: traceID,
SpanId: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08},
Name: "test-span",
},
},
},
},
},
},
}

// Create httptest server that validates the request
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Validate path
require.Equal(t, expectedPath, r.URL.Path, "path should match expected trace path")

// Validate headers
require.Equal(t, api.HeaderAcceptProtobuf, r.Header.Get(api.HeaderAccept), "Accept header should be protobuf")
require.Equal(t, userID, r.Header.Get(user.OrgIDHeaderName), "X-Scope-OrgID header should match userID")

// Validate method
require.Equal(t, http.MethodGet, r.Method, "method should be GET")

// Validate query parameters
require.Equal(t, "123", r.URL.Query().Get("start"), "start query parameter should be 123")
require.Equal(t, "456", r.URL.Query().Get("end"), "end query parameter should be 456")

// Marshal and return the trace
traceBytes, err := testTrace.Marshal()
require.NoError(t, err)

w.Header().Set("Content-Type", api.HeaderAcceptProtobuf)
w.WriteHeader(http.StatusOK)
_, err = w.Write(traceBytes)
require.NoError(t, err)
}))
defer server.Close()

// Create client
client, err := NewClient(server.URL, 10*time.Second)
require.NoError(t, err)

// Call TraceByID
ctx := context.Background()
resp, err := client.TraceByID(ctx, userID, traceID, 123, 456)
require.NoError(t, err)
require.NotNil(t, resp)
require.NotNil(t, resp.Trace)
require.Len(t, resp.Trace.ResourceSpans, 1)
require.Len(t, resp.Trace.ResourceSpans[0].ScopeSpans, 1)
require.Len(t, resp.Trace.ResourceSpans[0].ScopeSpans[0].Spans, 1)
require.Equal(t, "test-span", resp.Trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Name)
}
1 change: 1 addition & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
QueryModeBlocks = "blocks"
QueryModeAll = "all"
QueryModeRecent = "recent"
QueryModeExternal = "external"
)

// TraceByIDHandler is a http.HandlerFunc to retrieve traces
Expand Down
Loading