diff --git a/CHANGELOG.md b/CHANGELOG.md index 4194d77c95c..e64790de4cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ * [CHANGE] Do not count cached querier responses for SLO metrics such as inspected bytes [#5185](https://github.com/grafana/tempo/pull/5185) (@carles-grafana) * [CHANGE] Assert max live traces limits in local-blocks processor [#5170](https://github.com/grafana/tempo/pull/5170) (@mapno) * [ENHANCEMENT] Include backendwork dashboard and include additional alert [#5159](https://github.com/grafana/tempo/pull/5159) (@zalegrala) +* [ENHANCEMENT] Add endpoint for partition downscaling [#4913](https://github.com/grafana/tempo/pull/4913) (@mapno) * [ENHANCEMENT] Add alert for high error rate reported by vulture [#5206](https://github.com/grafana/tempo/pull/5206) (@ruslan-mikhailov) * [BUGFIX] Add nil check to partitionAssignmentVar [#5198](https://github.com/grafana/tempo/pull/5198) (@mapno) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 76da4e9bd6d..9fc44050b86 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -299,6 +299,9 @@ func (t *App) initIngester() (services.Service, error) { tempopb.RegisterQuerierServer(t.Server.GRPC(), t.ingester) t.Server.HTTPRouter().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) t.Server.HTTPRouter().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) + t.Server.HTTPRouter().Methods(http.MethodGet, http.MethodPost, http.MethodDelete). + Path("/ingester/prepare-partition-downscale"). + Handler(http.HandlerFunc(t.ingester.PreparePartitionDownscaleHandler)) return t.ingester, nil } diff --git a/docs/sources/tempo/api_docs/_index.md b/docs/sources/tempo/api_docs/_index.md index d869ca5b529..e32dc46163d 100644 --- a/docs/sources/tempo/api_docs/_index.md +++ b/docs/sources/tempo/api_docs/_index.md @@ -45,6 +45,7 @@ For externally supported gRPC API, [refer to Tempo gRPC API](#tempo-grpc-api). | Memberlist | Distributor, Ingester, Querier, Compactor | HTTP | `GET /memberlist` | | [Flush](#flush) | Ingester | HTTP | `GET,POST /flush` | | [Shutdown](#shutdown) | Ingester | HTTP | `GET,POST /shutdown` | +| [Prepare partition downscale](#prepare-partition-downscale) | Ingester | HTTP | `GET,POST,DELETE /ingester/prepare-partition-downscale` | | [Usage Metrics](#usage-metrics) | Distributor | HTTP | `GET /usage_metrics` | | [Distributor ring status](#distributor-ring-status) (*) | Distributor | HTTP | `GET /distributor/ring` | | [Ingesters ring status](#ingesters-ring-status) | Distributor, Querier | HTTP | `GET /ingester/ring` | @@ -740,6 +741,22 @@ ingester service. This is usually used at the time of scaling down a cluster. {{< /admonition >}} +### Prepare partition downscale + +``` +GET,POST,DELETE /ingester/prepare-partition-downscale +``` + +This endpoint prepares the ingester's partition for downscaling by setting it to the `INACTIVE` state. + +A `GET` call to this endpoint returns a timestamp of when the partition was switched to the `INACTIVE` state, or 0, if the partition is not in the `INACTIVE` state. + +A `POST` call switches this ingester's partition to the `INACTIVE` state, if it isn't `INACTIVE` already, and returns the timestamp of when the switch to the `INACTIVE` state occurred. + +A `DELETE` call sets the partition back from the `INACTIVE` to the `ACTIVE` state. + +If the ingester is not configured to use ingest-storage, any call to this endpoint fails. + ### Usage metrics {{< admonition type="note" >}} diff --git a/go.mod b/go.mod index bf1394ddd74..35bd3ae1825 100644 --- a/go.mod +++ b/go.mod @@ -24,8 +24,8 @@ require ( github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e - github.com/grafana/e2e v0.1.1 + github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e + github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673 github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/jaegertracing/jaeger v1.67.0 github.com/jedib0t/go-pretty/v6 v6.6.7 diff --git a/go.sum b/go.sum index dd8b5a37166..16ecbedfdef 100644 --- a/go.sum +++ b/go.sum @@ -394,10 +394,10 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e h1:VNYS3rQLs+h6obQppkTURXepE6hZv6cnV8msJGs9oHw= -github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e/go.mod h1:XgCfkrcj96DEvYCdTVTamQSo4vLTqoXrMZw5kUeWDAQ= -github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= -github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= +github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e h1:xszW9jlqA2Fsz1fXPc3aP494z3lRheGywP1pkFLuxbY= +github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e/go.mod h1:XgCfkrcj96DEvYCdTVTamQSo4vLTqoXrMZw5kUeWDAQ= +github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673 h1:Va04sDlP33f1SFUHRTho4QJfDlGTz+/HnBIpYwlqV9Q= +github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/gomemcache v0.0.0-20250318131618-74242eea118d h1:oXRJlb9UjVsl6LhqBdbyAQ9YFhExwsj4bjh5vwMNRZY= github.com/grafana/gomemcache v0.0.0-20250318131618-74242eea118d/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ= diff --git a/integration/e2e/ingest/config-partition-downscale.yaml b/integration/e2e/ingest/config-partition-downscale.yaml new file mode 100644 index 00000000000..b8d9d4e3944 --- /dev/null +++ b/integration/e2e/ingest/config-partition-downscale.yaml @@ -0,0 +1,98 @@ +server: + http_listen_port: 3200 + log_level: info + +distributor: + receivers: + jaeger: + protocols: + grpc: + endpoint: "distributor:14250" + +query_frontend: + rf1_after: "1970-01-01T00:00:00Z" + search: + query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend + query_ingesters_until: 0 + +ingester: + lifecycler: + min_ready_duration: 1s + ring: + kvstore: + store: memberlist + replication_factor: 1 + partition_ring: + kvstore: + store: memberlist + flush_object_storage: false + +querier: + frontend_worker: + frontend_address: tempo_e2e-query-frontend:9095 + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /var/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + traces_storage: + path: /var/tempo/generator/traces + traces_query_storage: + path: /var/tempo/generator/query_traces + processor: + local_blocks: + flush_to_storage: true + + +memberlist: + bind_port: 7946 + join_members: + - tempo_e2e-distributor:7946 + - tempo_e2e-ingester-1:7946 + - tempo_e2e-ingester-2:7946 + - tempo_e2e-metrics-generator:7946 + +storage: + trace: + blocklist_poll: 2s + blocklist_poll_stale_tenant_index: 1s # force components to always fall back to polling + backend: s3 + s3: + bucket: tempo + endpoint: tempo_e2e-minio-9000:9000 # TODO: this is brittle, fix this eventually + access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey + secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey + insecure: true + pool: + max_workers: 10 + queue_depth: 100 + + +overrides: + defaults: + metrics_generator: + processors: [local-blocks] + user_configurable_overrides: + enabled: true + poll_interval: 10s + client: + backend: local + local: + path: /var/tempo/overrides + +block_builder: + consume_cycle_duration: 1m + assigned_partitions: + block-builder-0: [0] + +ingest: + enabled: true + kafka: + address: kafka:9092 + topic: tempo-ingest \ No newline at end of file diff --git a/integration/e2e/ingest/ingest_storage_test.go b/integration/e2e/ingest/ingest_storage_test.go index 13a048e9c01..c1e4896e3c9 100644 --- a/integration/e2e/ingest/ingest_storage_test.go +++ b/integration/e2e/ingest/ingest_storage_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/grafana/e2e" + e2edb "github.com/grafana/e2e/db" "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" @@ -21,7 +22,7 @@ func TestIngest(t *testing.T) { // copy config template to shared directory and expand template variables require.NoError(t, util.CopyFileToSharedDir(s, "config-kafka.yaml", "config.yaml")) - kafka := NewKafka() + kafka := e2edb.NewKafka() require.NoError(t, s.StartAndWaitReady(kafka)) tempo := util.NewTempoAllInOne() diff --git a/integration/e2e/ingest/kafka.go b/integration/e2e/ingest/kafka.go deleted file mode 100644 index 4b32ce7f70b..00000000000 --- a/integration/e2e/ingest/kafka.go +++ /dev/null @@ -1,42 +0,0 @@ -package ingest - -import ( - "github.com/grafana/e2e" -) - -const kafkaImage = "confluentinc/cp-kafka:latest" - -func NewKafka() *e2e.HTTPService { - envVars := map[string]string{ - "CLUSTER_ID": "zH1GDqcNTzGMDCXm5VZQdg", - "KAFKA_BROKER_ID": "1", - "KAFKA_NUM_PARTITIONS": "2", - "KAFKA_PROCESS_ROLES": "broker,controller", - "KAFKA_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:29092", - "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092", - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT", - "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", - "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", - "KAFKA_CONTROLLER_QUORUM_VOTERS": "1@kafka:9093", - "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", - "KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS": "10000", - } - - service := e2e.NewConcreteService( - "kafka", - kafkaImage, - e2e.NewCommand("/etc/confluent/docker/run"), - // e2e.NewCmdReadinessProbe(e2e.NewCommand("kafka-topics", "--bootstrap-server", "broker:29092", "--list")), - e2e.NewCmdReadinessProbe(e2e.NewCommand("sh", "-c", "nc -z localhost 9092 || exit 1")), // TODO: A bit unstable, sometimes it fails - 9092, - 29092, - ) - - service.SetEnvVars(envVars) - - httpService := &e2e.HTTPService{ - ConcreteService: service, - } - - return httpService -} diff --git a/integration/e2e/ingest/partition_downscale_test.go b/integration/e2e/ingest/partition_downscale_test.go new file mode 100644 index 00000000000..467b008651b --- /dev/null +++ b/integration/e2e/ingest/partition_downscale_test.go @@ -0,0 +1,150 @@ +package ingest + +import ( + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/grafana/e2e" + e2edb "github.com/grafana/e2e/db" + "github.com/grafana/tempo/integration/util" + "github.com/grafana/tempo/pkg/httpclient" + tempoUtil "github.com/grafana/tempo/pkg/util" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestPartitionDownscale(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // copy config template to shared directory and expand template variables + require.NoError(t, util.CopyFileToSharedDir(s, "config-partition-downscale.yaml", "config.yaml")) + + // Start dependencies + kafka := e2edb.NewKafka() + require.NoError(t, s.StartAndWaitReady(kafka)) + + minio := e2edb.NewMinio(9000, "tempo") + require.NotNil(t, minio) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start Tempo components + distributor := util.NewTempoDistributor() + ingester := util.NewTempoIngester(0) + querier := util.NewTempoQuerier() + queryFrontend := util.NewTempoQueryFrontend() + + require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier, queryFrontend)) + + // Wait until ingester and metrics-generator are active + isServiceActiveMatcher := func(service string) []*labels.Matcher { + return []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "name", service), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"), + } + } + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(isServiceActiveMatcher("ingester")...), e2e.WaitMissingMetrics)) + + // Wait until joined to partition ring + partitionStateMatchers := func(state string) []*labels.Matcher { + return []*labels.Matcher{ + {Type: labels.MatchEqual, Name: "state", Value: state}, + {Type: labels.MatchEqual, Name: "name", Value: "ingester-partitions"}, + } + } + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Active")...))) + + // Get port for the Jaeger gRPC receiver endpoint + c, err := util.NewJaegerGRPCClient(distributor.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c) + + // Generate and emit initial traces + info := tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info.EmitAllBatches(c)) + + // Wait for traces to be received + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) + + // Create API client + apiClient := httpclient.New("http://"+queryFrontend.Endpoint(3200), "") + + // Set ingester's partition to INACTIVE state (prepare for downscale) + req, err := http.NewRequest("POST", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil) + require.NoError(t, err) + httpResp, err := apiClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, httpResp.StatusCode) + + // Verify ingester's partition is INACTIVE by checking the timestamp + req, err = http.NewRequest("GET", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil) + require.NoError(t, err) + httpResp, err = apiClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, httpResp.StatusCode) + var result map[string]interface{} + require.NoError(t, json.NewDecoder(httpResp.Body).Decode(&result)) + require.Greater(t, result["timestamp"].(float64), float64(0)) // ts > 0 ==> INACTIVE (when it was marked for downscale) + + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Inactive")...))) + + // Start block-builder (it should consume data from the downscaled partition) + blockbuilder := util.NewTempoBlockBuilder(0) + require.NoError(t, s.StartAndWaitReady(blockbuilder)) + + // Wait for blocks to be flushed from the downscaled partition + require.NoError(t, blockbuilder.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_block_builder_flushed_blocks"}, e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempodb_blocklist_length"}, + e2e.WaitMissingMetrics, e2e.WithLabelMatchers(&labels.Matcher{Type: labels.MatchEqual, Name: "tenant", Value: "single-tenant"}))) + + // Verify initial traces can be queried from backend storage + trace, err := apiClient.QueryTrace(info.HexID()) + require.NoError(t, err) + require.NotNil(t, trace) + + // Set ingester's partition back to ACTIVE state + req, err = http.NewRequest("DELETE", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil) + require.NoError(t, err) + httpResp, err = apiClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, httpResp.StatusCode) + + // Verify ingester's partition is ACTIVE by checking the timestamp is 0 + req, err = http.NewRequest("GET", "http://"+ingester.Endpoint(3200)+"/ingester/prepare-partition-downscale", nil) + require.NoError(t, err) + httpResp, err = apiClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, httpResp.StatusCode) + require.NoError(t, json.NewDecoder(httpResp.Body).Decode(&result)) + require.Equal(t, float64(0), result["timestamp"].(float64)) // ts == 0 ==> ACTIVE + + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempo_partition_ring_partitions"}, e2e.WithLabelMatchers(partitionStateMatchers("Active")...))) + + // Generate and emit more traces after reactivating the partition + info2 := tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info2.EmitAllBatches(c)) + + // Wait for new traces to be received + expected2, err := info2.ConstructTraceFromEpoch() + require.NoError(t, err) + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)+util.SpanCount(expected2)), "tempo_distributor_spans_received_total")) + + // Wait for the new traces to be flushed by block-builder + require.NoError(t, blockbuilder.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"tempo_block_builder_flushed_blocks"}, e2e.WaitMissingMetrics)) + + // Verify all traces using trace ID lookup + trace, err = apiClient.QueryTrace(info.HexID()) + require.NoError(t, err) + require.NotNil(t, trace) + require.Equal(t, util.SpanCount(expected), util.SpanCount(trace)) + + trace2, err := apiClient.QueryTrace(info2.HexID()) + require.NoError(t, err) + require.NotNil(t, trace2) + require.Equal(t, util.SpanCount(expected2), util.SpanCount(trace2)) +} diff --git a/integration/util/util.go b/integration/util/util.go index d2463fecad4..ffb74df1b94 100644 --- a/integration/util/util.go +++ b/integration/util/util.go @@ -230,6 +230,27 @@ func NewNamedTempoQuerier(name string, extraArgs ...string) *e2e.HTTPService { return s } +func NewTempoBlockBuilder(replica int, extraArgs ...string) *e2e.HTTPService { + return NewNamedTempoBlockBuilder("block-builder", replica, extraArgs...) +} + +func NewNamedTempoBlockBuilder(name string, replica int, extraArgs ...string) *e2e.HTTPService { + args := []string{"-config.file=" + filepath.Join(e2e.ContainerSharedDir, "config.yaml"), "-target=block-builder"} + args = buildArgsWithExtra(args, extraArgs) + + s := e2e.NewHTTPService( + name+"-"+strconv.Itoa(replica), + image, + e2e.NewCommandWithoutEntrypoint("/tempo", args...), + e2e.NewHTTPReadinessProbe(3200, "/ready", 200, 299), + 3200, + ) + + s.SetBackoff(TempoBackoff()) + + return s +} + func NewTempoScalableSingleBinary(replica int, extraArgs ...string) *e2e.HTTPService { args := []string{"-config.file=" + filepath.Join(e2e.ContainerSharedDir, "config.yaml"), "-target=scalable-single-binary", "-querier.frontend-address=tempo-" + strconv.Itoa(replica) + ":9095"} args = buildArgsWithExtra(args, extraArgs) diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 82bde24c866..cab7dafd52c 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -2151,6 +2151,10 @@ type mockRing struct { replicationFactor uint32 } +func (r mockRing) GetSubringForOperationStates(_ ring.Operation) ring.ReadRing { + panic("implement me if required for testing") +} + func (r mockRing) WritableInstancesWithTokensCount() int { panic("implement me if required for testing") } diff --git a/modules/ingester/downscale.go b/modules/ingester/downscale.go new file mode 100644 index 00000000000..dda2e8bb83d --- /dev/null +++ b/modules/ingester/downscale.go @@ -0,0 +1,105 @@ +package ingester + +import ( + "net/http" + + kitlog "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/log" +) + +// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the +// ingester will switch to INACTIVE state (read-only). +// +// Following methods are supported: +// +// - GET +// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state. +// +// - POST +// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to +// INACTIVE state happened. +// +// - DELETE +// Sets partition back from INACTIVE to ACTIVE state, and returns 0 signalling the partition is not in INACTIVE state +func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { + logger := kitlog.With(log.Logger, "partition", i.ingestPartitionID) + + // Don't allow callers to change the shutdown configuration while we're in the middle + // of starting or shutting down. + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + if !i.cfg.IngestStorageConfig.Enabled { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + switch r.Method { + case http.MethodPost: + // It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale + // will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we + // don't know to which state reverting back. Given a partition is expected to stay in PENDING state + // for a short period, we simply don't allow this case. + state, _, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionPending { + level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state") + w.WriteHeader(http.StatusConflict) + return + } + + if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + case http.MethodDelete: + state, _, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // If partition is inactive, make it active. Other states are a no-op. + if state == ring.PartitionInactive { + // We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency + // in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried. + // Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design. + // We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than + // "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer + // than "lookback period" ago, it looks to be an edge case not worth to address. + if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to active", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + + state, stateTimestamp, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + switch state { + case ring.PartitionInactive: + util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix(), "state": state.String()}) + default: + util.WriteJSONResponse(w, map[string]any{"timestamp": 0, "state": state.String()}) + } +} diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index b9aef70dea9..732bba73459 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -95,6 +95,10 @@ type ReadRing interface { // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) + // GetSubringForOperationStates returns a subring containing only instances for the given operation. + // This method only filters by instance state and does not check heartbeat timeouts. + GetSubringForOperationStates(op Operation) ReadRing + ReplicationFactor() int // InstancesCount returns the number of instances in the ring. @@ -721,6 +725,21 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } +func (r *Ring) GetSubringForOperationStates(op Operation) ReadRing { + r.mtx.RLock() + defer r.mtx.RUnlock() + + shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters)) + + for id, inst := range r.ringDesc.Ingesters { + if op.IsInstanceInStateHealthy(inst.State) { + shard[id] = inst + } + } + + return r.buildRingForTheShard(shard) +} + // CountTokens returns the number tokens within the range for each instance. // In case of zone-awareness, this method takes into account only tokens of // the same zone. More precisely, for each instance only the distance between diff --git a/vendor/github.com/grafana/e2e/db/kafka.go b/vendor/github.com/grafana/e2e/db/kafka.go new file mode 100644 index 00000000000..fe91aedc717 --- /dev/null +++ b/vendor/github.com/grafana/e2e/db/kafka.go @@ -0,0 +1,103 @@ +package e2edb + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/e2e" + "github.com/grafana/e2e/images" +) + +type KafkaService struct { + *e2e.HTTPService +} + +func NewKafka() *KafkaService { + return &KafkaService{ + HTTPService: e2e.NewHTTPService( + "kafka", + images.Kafka, + nil, // No custom command. + NewKafkaReadinessProbe(9092), + 9092, + ), + } +} + +func (s *KafkaService) Start(networkName, sharedDir string) (err error) { + // Configures Kafka right before starting it so that we have the networkName to correctly compute + // the advertised host. + s.HTTPService.SetEnvVars(map[string]string{ + // Configure Kafka to run in KRaft mode (without Zookeeper). + "CLUSTER_ID": "NqnEdODVKkiLTfJvqd1uqQ==", // A random ID (16 bytes of a base64-encoded UUID). + "KAFKA_BROKER_ID": "1", + "KAFKA_NODE_ID": "1", + "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093,PLAINTEXT_HOST://localhost:29092", // Host and port to which Kafka binds to for listening. + "KAFKA_PROCESS_ROLES": "broker,controller", + "KAFKA_CONTROLLER_QUORUM_VOTERS": "1@kafka:29093", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + + // Configure the advertised host and post. + "KAFKA_ADVERTISED_LISTENERS": fmt.Sprintf("PLAINTEXT://%s-kafka:9092,PLAINTEXT_HOST://localhost:29092", networkName), + + // RF=1. + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1", + "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0", + + // No TLS. + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT", + "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", + + // Enough partitions for integration tests. + "KAFKA_NUM_PARTITIONS": "3", + + "LOG4J_ROOT_LOGLEVEL": "WARN", + }) + + return s.HTTPService.Start(networkName, sharedDir) +} + +// KafkaReadinessProbe checks readiness by ensure a Kafka broker is up and running. +type KafkaReadinessProbe struct { + port int +} + +func NewKafkaReadinessProbe(port int) *KafkaReadinessProbe { + return &KafkaReadinessProbe{ + port: port, + } +} + +func (p *KafkaReadinessProbe) Ready(service *e2e.ConcreteService) (err error) { + const timeout = time.Second + + endpoint := service.Endpoint(p.port) + if endpoint == "" { + return fmt.Errorf("cannot get service endpoint for port %d", p.port) + } else if endpoint == "stopped" { + return errors.New("service has stopped") + } + + client, err := kgo.NewClient(kgo.SeedBrokers(endpoint), kgo.DialTimeout(timeout)) + if err != nil { + return err + } + + // Ensure we close the client once done. + defer client.Close() + + admin := kadm.NewClient(client) + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + _, err = admin.ApiVersions(ctxWithTimeout) + return err +} diff --git a/vendor/github.com/grafana/e2e/images/images.go b/vendor/github.com/grafana/e2e/images/images.go index c6b47ceac79..23f57ff8a25 100644 --- a/vendor/github.com/grafana/e2e/images/images.go +++ b/vendor/github.com/grafana/e2e/images/images.go @@ -12,4 +12,5 @@ var ( BigtableEmulator = "shopify/bigtable-emulator:0.1.0" Cassandra = "rinscy/cassandra:3.11.0" SwiftEmulator = "bouncestorage/swift-aio:55ba4331" + Kafka = "apache/kafka:4.0.0" ) diff --git a/vendor/github.com/grafana/e2e/metrics.go b/vendor/github.com/grafana/e2e/metrics.go index 18378fb447b..dd4ec82ec4b 100644 --- a/vendor/github.com/grafana/e2e/metrics.go +++ b/vendor/github.com/grafana/e2e/metrics.go @@ -127,6 +127,17 @@ func Less(value float64) func(sums ...float64) bool { } } +// Between is an isExpected function for WaitSumMetrics that returns true if given single sum is greater than or equal to lower +// and less than or equal to upper. +func Between(lower, upper float64) func(sums ...float64) bool { + return func(sums ...float64) bool { + if len(sums) != 1 { + panic("equals: expected one value") + } + return sums[0] >= lower && sums[0] <= upper + } +} + // EqualsAmongTwo is an isExpected function for WaitSumMetrics that returns true if first sum is equal to the second. // NOTE: Be careful on scrapes in between of process that changes two metrics. Those are // usually not atomic. diff --git a/vendor/modules.txt b/vendor/modules.txt index 244df656400..94b2bc2753c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -691,7 +691,7 @@ github.com/gorilla/handlers # github.com/gorilla/mux v1.8.1 ## explicit; go 1.20 github.com/gorilla/mux -# github.com/grafana/dskit v0.0.0-20250529133336-ec9bc5b3a14e +# github.com/grafana/dskit v0.0.0-20250529170946-28928403e61e ## explicit; go 1.23.0 github.com/grafana/dskit/backoff github.com/grafana/dskit/cancellation @@ -735,8 +735,8 @@ github.com/grafana/dskit/tenant github.com/grafana/dskit/test github.com/grafana/dskit/tracing github.com/grafana/dskit/user -# github.com/grafana/e2e v0.1.1 -## explicit; go 1.18 +# github.com/grafana/e2e v0.1.2-0.20250428181430-708d63bcc673 +## explicit; go 1.19 github.com/grafana/e2e github.com/grafana/e2e/db github.com/grafana/e2e/images