Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [ENHANCEMENT] Add querier metrics for requests executed [#3524](https://github.com/grafana/tempo/pull/3524) (@electron0zero)
* [FEATURE] Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott)
* [CHANGE] Align metrics query time ranges to the step parameter [#3490](https://github.com/grafana/tempo/pull/3490) (@mdisibio)
* [CHANGE] **Breaking Change** Remove trace by id hedging from the frontend [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* [CHANGE] Change the UID and GID of the `tempo` user to avoid root [#2265](https://github.com/grafana/tempo/pull/2265) (@zalegrala)
**BREAKING CHANGE** Ownership of /var/tempo is changing. Historyically this
has been owned by root:root, and with this change it will now be owned by
Expand All @@ -26,7 +27,6 @@

* [BUGFIX] Fix compaction/retention in AWS S3 and GCS when a prefix is configured. [#3465](https://github.com/grafana/tempo/issues/3465) (@bpfoster)


## v2.4.0

* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
Expand Down
8 changes: 0 additions & 8 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,6 @@ query_frontend:
# (default: 0)
[concurrent_shards: <int>]

# If set to a non-zero value, a second request will be issued at the provided duration.
# Recommended to be set to p99 of search requests to reduce long-tail latency.
[hedge_requests_at: <duration> | default = 2s ]

# The maximum number of requests to execute when hedging.
# Requires hedge_requests_at to be set. Must be greater than 0.
[hedge_requests_up_to: <int> | default = 2 ]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds.
[duration_slo: <duration> | default = 0s ]
Expand Down
2 changes: 0 additions & 2 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ query_frontend:
query_ingesters_until: 30m0s
trace_by_id:
query_shards: 50
Comment thread
electron0zero marked this conversation as resolved.
hedge_requests_at: 2s
hedge_requests_up_to: 2
metrics:
concurrent_jobs: 1000
target_bytes_per_job: 104857600
Expand Down
42 changes: 42 additions & 0 deletions integration/e2e/config-limits-429.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200

distributor:
receivers:
jaeger:
protocols:
grpc:

overrides:
defaults:
ingestion:
max_traces_per_user: 1
rate_limit_bytes: 500
burst_size_bytes: 500
global:
max_bytes_per_trace: 130

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 3600s

query_frontend:
max_outstanding_per_tenant: 0 # forces everything to 429

storage:
trace:
backend: local
local:
path: /var/tempo
pool:
max_workers: 10
queue_depth: 100
58 changes: 58 additions & 0 deletions integration/e2e/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

util "github.com/grafana/tempo/integration"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"

Expand All @@ -36,6 +37,7 @@ const (
configLimits = "config-limits.yaml"
configLimitsQuery = "config-limits-query.yaml"
configLimitsPartialError = "config-limits-partial-success.yaml"
configLimits429 = "config-limits-429.yaml"
)

func TestLimits(t *testing.T) {
Expand Down Expand Up @@ -321,3 +323,59 @@ func TestLimitsPartialSuccess(t *testing.T) {
)
require.NoError(t, err)
}

func TestQueryRateLimits(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configLimits429, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

// Get port for the otlp receiver endpoint
c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)

// make a trace with 10 spans and push them one at a time, flush in between each one to force different blocks
batch := makeThriftBatchWithSpanCount(5)
allSpans := batch.Spans
for i := range batch.Spans {
batch.Spans = allSpans[i : i+1]
require.NoError(t, c.EmitBatch(context.Background(), batch))
callFlush(t, tempo)
time.Sleep(2 * time.Second) // trace idle and flush time are both 1ms
}
// now try to query it back. this should fail b/c the trace is too large
Comment thread
joe-elliott marked this conversation as resolved.
Outdated
client := httpclient.New("http://"+tempo.Endpoint(3200), tempoUtil.FakeTenantID)

// 429 HTTP Trace ID Lookup
traceID := []byte{0x01, 0x02}
_, err = client.QueryTrace(tempoUtil.TraceIDToHexString(traceID))
require.ErrorContains(t, err, "job queue full")
require.ErrorContains(t, err, "failed with response: 429")

start := time.Now().Add(-1 * time.Hour).Unix()
end := time.Now().Add(1 * time.Hour).Unix()

// 429 HTTP Search
_, err = client.SearchTraceQLWithRange("{}", start, end)
require.ErrorContains(t, err, "job queue full")
require.ErrorContains(t, err, "failed with response: 429")

// 429 GRPC Search
grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
require.NoError(t, err)

resp, err := grpcClient.Search(context.Background(), &tempopb.SearchRequest{
Query: "{}",
Start: uint32(start),
End: uint32(end),
})
require.NoError(t, err)

_, err = resp.Recv()
require.ErrorContains(t, err, "job queue full")
require.ErrorContains(t, err, "code = ResourceExhausted")
}
11 changes: 2 additions & 9 deletions integration/e2e/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,8 @@ func TestMultiTenantSearch(t *testing.T) {
resp, err := apiClient.QueryTrace(info.HexID())
require.NoError(t, err)
respTm := getAttrsAndSpanNames(resp)
if tc.tenantSize > 1 {
// resource keys should contain tenant key in case of a multi-tenant query
traceMap.rKeys = append(traceMap.rKeys, "tenant")
// resource values will contain at-least one of tenant ids for multi-tenant query
// or exactly match in case of single tenant query
assert.Subset(t, append(traceMap.rValues, tenants...), respTm.rValues)
} else {
assert.ElementsMatch(t, traceMap.rValues, respTm.rValues)
}

assert.ElementsMatch(t, traceMap.rValues, respTm.rValues)
assert.ElementsMatch(t, respTm.rKeys, traceMap.rKeys)
assert.ElementsMatch(t, traceMap.spanNames, respTm.spanNames)

Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type genericCombiner[T TResponse] struct {
}

// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(res *http.Response, _ string) error {
func (c *genericCombiner[T]) AddResponse(res *http.Response) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/combiner/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Implementations must be thread-safe.
// TODO: StatusCode() and the tenant parameter on AddRequest are only used in for multi-tenant support. Can we remove them?
Comment thread
joe-elliott marked this conversation as resolved.
Outdated
type Combiner interface {
AddResponse(r *http.Response, tenant string) error
AddResponse(r *http.Response) error
StatusCode() int
ShouldQuit() bool

Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/combiner/search_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func TestTagsCombiner(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
combiner := tc.factory(tc.limit)

err := combiner.AddResponse(toHTTPResponse(t, tc.result1, 200), "")
err := combiner.AddResponse(toHTTPResponse(t, tc.result1, 200))
assert.NoError(t, err)

err = combiner.AddResponse(toHTTPResponse(t, tc.result2, 200), "")
err = combiner.AddResponse(toHTTPResponse(t, tc.result2, 200))
assert.NoError(t, err)

res, err := combiner.HTTPFinal()
Expand Down Expand Up @@ -210,15 +210,15 @@ func TestTagValuesV2GRPCCombiner(t *testing.T) {
}

func testGRPCCombiner[T proto.Message](t *testing.T, combiner GRPCCombiner[T], result1 T, result2 T, diff1 T, diff2 T, expectedFinal T, sort func(T)) {
err := combiner.AddResponse(toHTTPResponse(t, result1, 200), "")
err := combiner.AddResponse(toHTTPResponse(t, result1, 200))
require.NoError(t, err)

actualDiff1, err := combiner.GRPCDiff()
require.NoError(t, err)
sort(actualDiff1)
require.Equal(t, diff1, actualDiff1)

err = combiner.AddResponse(toHTTPResponse(t, result2, 200), "")
err = combiner.AddResponse(toHTTPResponse(t, result2, 200))
assert.NoError(t, err)

actualDiff2, err := combiner.GRPCDiff()
Expand Down
28 changes: 14 additions & 14 deletions modules/frontend/combiner/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ func TestSearchProgressShouldQuit(t *testing.T) {

// 500 response should quit
c = NewSearch(0)
err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500), "")
err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// 429 response should quit
c = NewSearch(0)
err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429), "")
err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// unparseable body should not quit, but should return an error
c = NewSearch(0)
err = c.AddResponse(&http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}, "")
err = c.AddResponse(&http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200})
require.Error(t, err)
should = c.ShouldQuit()
require.False(t, should)
Expand All @@ -52,7 +52,7 @@ func TestSearchProgressShouldQuit(t *testing.T) {
TraceID: "1",
},
},
}, 200), "")
}, 200))
require.NoError(t, err)
should = c.ShouldQuit()
require.False(t, should)
Expand All @@ -68,7 +68,7 @@ func TestSearchProgressShouldQuit(t *testing.T) {
TraceID: "2",
},
},
}, 200), "")
}, 200))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestSearchCombinesResults(t *testing.T) {
},
Metrics: &tempopb.SearchMetrics{},
}, 200)
err := c.AddResponse(sr, "")
err := c.AddResponse(sr)
require.NoError(t, err)

expected := &tempopb.SearchResponse{
Expand Down Expand Up @@ -293,9 +293,9 @@ func TestSearchResponseCombiner(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
combiner := NewTypedSearch(20)

err := combiner.AddResponse(tc.response1, "")
err := combiner.AddResponse(tc.response1)
require.NoError(t, err)
err = combiner.AddResponse(tc.response2, "")
err = combiner.AddResponse(tc.response2)
require.NoError(t, err)

httpResp, err := combiner.HTTPFinal()
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestSearchDiffsResults(t *testing.T) {
require.Equal(t, expectedNoDiff, actual)

// add a trace and get it back in diff
err = c.AddResponse(sr, "")
err = c.AddResponse(sr)
require.NoError(t, err)

actual, err = c.GRPCDiff()
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestSearchDiffsResults(t *testing.T) {
},
}

err = c.AddResponse(sr2, "")
err = c.AddResponse(sr2)
require.NoError(t, err)

actual, err = c.GRPCDiff()
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestCombinerDiffs(t *testing.T) {
InspectedTraces: 1,
InspectedBytes: 2,
},
}, 200), "")
}, 200))
require.NoError(t, err)

// now we should get the same metadata as above
Expand Down Expand Up @@ -479,7 +479,7 @@ func TestCombinerDiffs(t *testing.T) {
InspectedTraces: 1,
InspectedBytes: 2,
},
}, 200), "")
}, 200))
require.NoError(t, err)

resp, err = combiner.GRPCDiff()
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestCombinerDiffs(t *testing.T) {
InspectedTraces: 1,
InspectedBytes: 2,
},
}, 200), "")
}, 200))
require.NoError(t, err)

resp, err = combiner.GRPCDiff()
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestSearchCombinerDoesNotRace(t *testing.T) {
CompletedJobs: 1,
},
}, 200)
_ = combiner.AddResponse(resp, "")
_ = combiner.AddResponse(resp)
})

go concurrent(func() {
Expand Down
Loading