Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [CHANGE] Do not count cached querier responses for SLO metrics such as inspected bytes [#5185](https://github.com/grafana/tempo/pull/5185) (@carles-grafana)
* [CHANGE] Assert max live traces limits in local-blocks processor [#5170](https://github.com/grafana/tempo/pull/5170) (@mapno)
* [ENHANCEMENT] Include backendwork dashboard and include additional alert [#5159](https://github.com/grafana/tempo/pull/5159) (@zalegrala)
* [ENHANCEMENT] Add endpoint for partition downscaling [#4913](https://github.com/grafana/tempo/pull/4913) (@mapno)
* [ENHANCEMENT] Add alert for high error rate reported by vulture [#5206](https://github.com/grafana/tempo/pull/5206) (@ruslan-mikhailov)
* [BUGFIX] Add nil check to partitionAssignmentVar [#5198](https://github.com/grafana/tempo/pull/5198) (@mapno)

Expand Down
3 changes: 3 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ func (t *App) initIngester() (services.Service, error) {
tempopb.RegisterQuerierServer(t.Server.GRPC(), t.ingester)
t.Server.HTTPRouter().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.Server.HTTPRouter().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
t.Server.HTTPRouter().Methods(http.MethodGet, http.MethodPost, http.MethodDelete).
Path("/ingester/prepare-partition-downscale").
Handler(http.HandlerFunc(t.ingester.PreparePartitionDownscaleHandler))
return t.ingester, nil
}

Expand Down
17 changes: 17 additions & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ For externally supported gRPC API, [refer to Tempo gRPC API](#tempo-grpc-api).
| Memberlist | Distributor, Ingester, Querier, Compactor | HTTP | `GET /memberlist` |
| [Flush](#flush) | Ingester | HTTP | `GET,POST /flush` |
| [Shutdown](#shutdown) | Ingester | HTTP | `GET,POST /shutdown` |
| [Prepare partition downscale](#prepare-partition-downscale) | Ingester | HTTP | `GET,POST,DELETE /ingester/prepare-partition-downscale` |
| [Usage Metrics](#usage-metrics) | Distributor | HTTP | `GET /usage_metrics` |
| [Distributor ring status](#distributor-ring-status) (*) | Distributor | HTTP | `GET /distributor/ring` |
| [Ingesters ring status](#ingesters-ring-status) | Distributor, Querier | HTTP | `GET /ingester/ring` |
Expand Down Expand Up @@ -740,6 +741,22 @@ ingester service.
This is usually used at the time of scaling down a cluster.
{{< /admonition >}}

### Prepare partition downscale

```
GET,POST,DELETE /ingester/prepare-partition-downscale
```

This endpoint prepares the ingester's partition for downscaling by setting it to the `INACTIVE` state.

A `GET` call to this endpoint returns a timestamp of when the partition was switched to the `INACTIVE` state, or 0, if the partition is not in the `INACTIVE` state.

A `POST` call switches this ingester's partition to the `INACTIVE` state, if it isn't `INACTIVE` already, and returns the timestamp of when the switch to the `INACTIVE` state occurred.

A `DELETE` call sets the partition back from the `INACTIVE` to the `ACTIVE` state.

If the ingester is not configured to use ingest-storage, any call to this endpoint fails.

### Usage metrics

{{< admonition type="note" >}}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e
github.com/grafana/e2e v0.1.1
github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e
github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jaegertracing/jaeger v1.67.0
github.com/jedib0t/go-pretty/v6 v6.6.7
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e h1:VNYS3rQLs+h6obQppkTURXepE6hZv6cnV8msJGs9oHw=
github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e/go.mod h1:XgCfkrcj96DEvYCdTVTamQSo4vLTqoXrMZw5kUeWDAQ=
github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc=
github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE=
github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e h1:xszW9jlqA2Fsz1fXPc3aP494z3lRheGywP1pkFLuxbY=
github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e/go.mod h1:XgCfkrcj96DEvYCdTVTamQSo4vLTqoXrMZw5kUeWDAQ=
github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673 h1:Va04sDlP33f1SFUHRTho4QJfDlGTz+/HnBIpYwlqV9Q=
github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/gomemcache v0.0.0-20250318131618-74242eea118d h1:oXRJlb9UjVsl6LhqBdbyAQ9YFhExwsj4bjh5vwMNRZY=
github.com/grafana/gomemcache v0.0.0-20250318131618-74242eea118d/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw=
github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ=
Expand Down
98 changes: 98 additions & 0 deletions integration/e2e/ingest/config-partition-downscale.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
server:
http_listen_port: 3200
log_level: info

distributor:
receivers:
jaeger:
protocols:
grpc:
endpoint: "distributor:14250"

query_frontend:
rf1_after: "1970-01-01T00:00:00Z"
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0

ingester:
lifecycler:
min_ready_duration: 1s
ring:
kvstore:
store: memberlist
replication_factor: 1
partition_ring:
kvstore:
store: memberlist
flush_object_storage: false

querier:
frontend_worker:
frontend_address: tempo_e2e-query-frontend:9095

metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
traces_query_storage:
path: /var/tempo/generator/query_traces
processor:
local_blocks:
flush_to_storage: true


memberlist:
bind_port: 7946
join_members:
- tempo_e2e-distributor:7946
- tempo_e2e-ingester-1:7946
- tempo_e2e-ingester-2:7946
- tempo_e2e-metrics-generator:7946

storage:
trace:
blocklist_poll: 2s
blocklist_poll_stale_tenant_index: 1s # force components to always fall back to polling
backend: s3
s3:
bucket: tempo
endpoint: tempo_e2e-minio-9000:9000 # TODO: this is brittle, fix this eventually
access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey
secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey
insecure: true
pool:
max_workers: 10
queue_depth: 100


overrides:
defaults:
metrics_generator:
processors: [local-blocks]
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: local
local:
path: /var/tempo/overrides

block_builder:
consume_cycle_duration: 1m
assigned_partitions:
block-builder-0: [0]

ingest:
enabled: true
kafka:
address: kafka:9092
topic: tempo-ingest
3 changes: 2 additions & 1 deletion integration/e2e/ingest/ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/httpclient"
tempoUtil "github.com/grafana/tempo/pkg/util"
Expand All @@ -21,7 +22,7 @@ func TestIngest(t *testing.T) {
// copy config template to shared directory and expand template variables
require.NoError(t, util.CopyFileToSharedDir(s, "config-kafka.yaml", "config.yaml"))

kafka := NewKafka()
kafka := e2edb.NewKafka()
require.NoError(t, s.StartAndWaitReady(kafka))

tempo := util.NewTempoAllInOne()
Expand Down
42 changes: 0 additions & 42 deletions integration/e2e/ingest/kafka.go

This file was deleted.

150 changes: 150 additions & 0 deletions integration/e2e/ingest/partition_downscale_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package ingest

import (
"encoding/json"
"net/http"
"testing"
"time"

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/httpclient"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestPartitionDownscale(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

// copy config template to shared directory and expand template variables
require.NoError(t, util.CopyFileToSharedDir(s, "config-partition-downscale.yaml", "config.yaml"))

// Start dependencies
kafka := e2edb.NewKafka()
require.NoError(t, s.StartAndWaitReady(kafka))

minio := e2edb.NewMinio(9000, "tempo")
require.NotNil(t, minio)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Tempo components
distributor := util.NewTempoDistributor()
ingester := util.NewTempoIngester(0)
querier := util.NewTempoQuerier()
queryFrontend := util.NewTempoQueryFrontend()

require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier, queryFrontend))

// Wait until ingester and metrics-generator are active
isServiceActiveMatcher := func(service string) []*labels.Matcher {
return []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "name", service),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
}
}
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(isServiceActiveMatcher("ingester")...), e2e.WaitMissingMetrics))

// Wait until joined to partition ring
partitionStateMatchers := func(state string) []*labels.Matcher {
return []*labels.Matcher{
{Type: labels.MatchEqual, Name: "state", Value: state},
{Type: labels.MatchEqual, Name: "name", Value: "ingester-partitions"},
}
}
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Active")...)))

// Get port for the Jaeger gRPC receiver endpoint
c, err := util.NewJaegerGRPCClient(distributor.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)

// Generate and emit initial traces
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

// Wait for traces to be received
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total"))

// Create API client
apiClient := httpclient.New("http://"+queryFrontend.Endpoint(3200), "")

// Set ingester's partition to INACTIVE state (prepare for downscale)
req, err := http.NewRequest("POST", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil)
require.NoError(t, err)
httpResp, err := apiClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, httpResp.StatusCode)

// Verify ingester's partition is INACTIVE by checking the timestamp
req, err = http.NewRequest("GET", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil)
require.NoError(t, err)
httpResp, err = apiClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, httpResp.StatusCode)
var result map[string]interface{}
require.NoError(t, json.NewDecoder(httpResp.Body).Decode(&result))
require.Greater(t, result["timestamp"].(float64), float64(0)) // ts > 0 ==> INACTIVE (when it was marked for downscale)

require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Inactive")...)))

// Start block-builder (it should consume data from the downscaled partition)
blockbuilder := util.NewTempoBlockBuilder(0)
require.NoError(t, s.StartAndWaitReady(blockbuilder))

// Wait for blocks to be flushed from the downscaled partition
require.NoError(t, blockbuilder.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_block_builder_flushed_blocks"}, e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempodb_blocklist_length"},
e2e.WaitMissingMetrics, e2e.WithLabelMatchers(&labels.Matcher{Type: labels.MatchEqual, Name: "tenant", Value: "single-tenant"})))

// Verify initial traces can be queried from backend storage
trace, err := apiClient.QueryTrace(info.HexID())
require.NoError(t, err)
require.NotNil(t, trace)

// Set ingester's partition back to ACTIVE state
req, err = http.NewRequest("DELETE", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil)
require.NoError(t, err)
httpResp, err = apiClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, httpResp.StatusCode)

// Verify ingester's partition is ACTIVE by checking the timestamp is 0
req, err = http.NewRequest("GET", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil)
require.NoError(t, err)
httpResp, err = apiClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, httpResp.StatusCode)
require.NoError(t, json.NewDecoder(httpResp.Body).Decode(&result))
require.Equal(t, float64(0), result["timestamp"].(float64)) // ts == 0 ==> ACTIVE

require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Active")...)))

// Generate and emit more traces after reactivating the partition
info2 := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info2.EmitAllBatches(c))

// Wait for new traces to be received
expected2, err := info2.ConstructTraceFromEpoch()
require.NoError(t, err)
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)+util.SpanCount(expected2)), "tempo_distributor_spans_received_total"))

// Wait for the new traces to be flushed by block-builder
require.NoError(t, blockbuilder.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"tempo_block_builder_flushed_blocks"}, e2e.WaitMissingMetrics))

// Verify all traces using trace ID lookup
trace, err = apiClient.QueryTrace(info.HexID())
require.NoError(t, err)
require.NotNil(t, trace)
require.Equal(t, util.SpanCount(expected), util.SpanCount(trace))

trace2, err := apiClient.QueryTrace(info2.HexID())
require.NoError(t, err)
require.NotNil(t, trace2)
require.Equal(t, util.SpanCount(expected2), util.SpanCount(trace2))
}
Loading
Loading