-
Notifications
You must be signed in to change notification settings - Fork 692
enhancement: Improve live store readiness state management #6238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
606f90e
a43c80d
b0573bb
2f6e211
8af24e7
74d07fe
cabbb60
d3d1003
39369ab
86722a4
290a8ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| live_store: | ||
| readiness_target_lag: 100ms | ||
| readiness_max_wait: 10s |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| live_store: | ||
| readiness_target_lag: 100ms | ||
| readiness_max_wait: 60s |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,225 @@ | ||
| package deployments | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/grafana/e2e" | ||
| "github.com/grafana/tempo/integration/util" | ||
| tempoUtil "github.com/grafana/tempo/pkg/util" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // TestLiveStoreReadinessDefaultBehavior verifies that with readiness_target_lag=0 (default), | ||
| // the LiveStore becomes ready immediately without waiting | ||
| func TestLiveStoreReadinessDefaultBehavior(t *testing.T) { | ||
| util.RunIntegrationTests(t, util.TestHarnessConfig{ | ||
| Components: util.ComponentsRecentDataQuerying, | ||
| }, func(h *util.TempoHarness) { | ||
| liveStoreA := h.Services[util.ServiceLiveStoreZoneA] | ||
|
|
||
| // With default config (readiness_target_lag=0), LiveStore should be ready immediately | ||
| require.NoError(t, liveStoreA.WaitReady()) | ||
|
|
||
| // Verify /ready endpoint returns 200 | ||
| req, err := http.NewRequest("GET", "http://"+liveStoreA.Endpoint(3200)+"/ready", nil) | ||
| require.NoError(t, err) | ||
| httpResp, err := http.DefaultClient.Do(req) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 200, httpResp.StatusCode) | ||
|
|
||
| // Verify tempo_live_store_ready metric is 1 | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.Equals(1), "tempo_live_store_ready")) | ||
| }) | ||
| } | ||
|
|
||
| // TestLiveStoreReadinessWithCatchUp verifies that readiness waiting works correctly | ||
| // and metrics are recorded | ||
| func TestLiveStoreReadinessWithCatchUp(t *testing.T) { | ||
| util.RunIntegrationTests(t, util.TestHarnessConfig{ | ||
| ConfigOverlay: "config-livestore-readiness-enabled.yaml", | ||
| Components: util.ComponentsRecentDataQuerying, | ||
| }, func(h *util.TempoHarness) { | ||
| liveStoreA := h.Services[util.ServiceLiveStoreZoneA] | ||
| liveStoreB := h.Services[util.ServiceLiveStoreZoneB] | ||
|
|
||
| // Stop liveStoreB to simplify the test | ||
| require.NoError(t, liveStoreB.Stop()) | ||
|
|
||
| // Wait for LiveStore to be ready | ||
| h.WaitTracesWritable(t) | ||
|
|
||
| // Write some traces to create Kafka lag | ||
| for i := 0; i < 5; i++ { | ||
| info := tempoUtil.NewTraceInfo(time.Now(), "") | ||
| require.NoError(t, h.WriteTraceInfo(info, "")) | ||
| } | ||
|
|
||
| // Wait for traces to be processed | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.GreaterOrEqual(5), "tempo_live_store_traces_created_total")) | ||
|
|
||
| // Stop the LiveStore | ||
| require.NoError(t, liveStoreA.Stop()) | ||
|
|
||
| // Write more traces during downtime to create lag | ||
| for i := 0; i < 3; i++ { | ||
| require.NoError(t, h.WriteTraceInfo(tempoUtil.NewTraceInfo(time.Now(), ""), "")) | ||
| time.Sleep(100 * time.Millisecond) | ||
| } | ||
|
|
||
| // Restart LiveStore | ||
| require.NoError(t, liveStoreA.Start(h.TestScenario.NetworkName(), h.TestScenario.SharedDir())) | ||
|
|
||
| // Wait for it to become ready (it should catch up) | ||
| require.NoError(t, liveStoreA.WaitReady()) | ||
|
|
||
| // Verify /ready endpoint returns 200 | ||
| req, err := http.NewRequest("GET", "http://"+liveStoreA.Endpoint(3200)+"/ready", nil) | ||
| require.NoError(t, err) | ||
| httpResp, err := http.DefaultClient.Do(req) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 200, httpResp.StatusCode) | ||
|
|
||
| // Verify tempo_live_store_ready metric is 1 | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.Equals(1), "tempo_live_store_ready")) | ||
|
|
||
| // Verify catch_up_duration metric was recorded | ||
| // The metric should have at least one observation | ||
| metrics, err := liveStoreA.SumMetrics([]string{"tempo_live_store_catch_up_duration_seconds"}) | ||
| require.NoError(t, err) | ||
| require.Greater(t, metrics[0], 0.0, "catch_up_duration should have been recorded") | ||
| }) | ||
| } | ||
|
|
||
| // TestLiveStoreReadinessMaxWaitTimeout verifies that LiveStore becomes ready | ||
| // after readiness_max_wait even if lag is still high | ||
| func TestLiveStoreReadinessMaxWaitTimeout(t *testing.T) { | ||
| util.RunIntegrationTests(t, util.TestHarnessConfig{ | ||
| ConfigOverlay: "config-livestore-readiness-timeout.yaml", | ||
| Components: util.ComponentsRecentDataQuerying, | ||
| }, func(h *util.TempoHarness) { | ||
| liveStoreA := h.Services[util.ServiceLiveStoreZoneA] | ||
| liveStoreB := h.Services[util.ServiceLiveStoreZoneB] | ||
|
|
||
| // Stop liveStoreB to simplify the test | ||
| require.NoError(t, liveStoreB.Stop()) | ||
|
|
||
| // Wait for LiveStore to be ready initially | ||
| h.WaitTracesWritable(t) | ||
|
|
||
| // Write some traces | ||
| for i := 0; i < 3; i++ { | ||
| info := tempoUtil.NewTraceInfo(time.Now(), "") | ||
| require.NoError(t, h.WriteTraceInfo(info, "")) | ||
| } | ||
|
|
||
| // Wait for traces to be processed | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.GreaterOrEqual(3), "tempo_live_store_traces_created_total")) | ||
|
|
||
| // Stop the LiveStore | ||
| require.NoError(t, liveStoreA.Stop()) | ||
|
|
||
| // Write many traces during downtime to create significant lag | ||
| // With readiness_target_lag=100ms and readiness_max_wait=5s, | ||
| // the LiveStore should become ready after 5s even if lag is high | ||
| for i := 0; i < 50; i++ { | ||
| require.NoError(t, h.WriteTraceInfo(tempoUtil.NewTraceInfo(time.Now(), ""), "")) | ||
| time.Sleep(200 * time.Millisecond) // Create lag that exceeds target | ||
| } | ||
|
|
||
| // Restart LiveStore | ||
| startTime := time.Now() | ||
| require.NoError(t, liveStoreA.Start(h.TestScenario.NetworkName(), h.TestScenario.SharedDir())) | ||
|
|
||
| // It should become ready due to max_wait timeout (5s) | ||
| require.NoError(t, liveStoreA.WaitReady()) | ||
| elapsed := time.Since(startTime) | ||
|
|
||
| // Should have waited close to max_wait (5s), but not too long | ||
| require.Less(t, elapsed, 15*time.Second, "should become ready within reasonable time") | ||
|
|
||
| // Verify /ready endpoint returns 200 | ||
| req, err := http.NewRequest("GET", "http://"+liveStoreA.Endpoint(3200)+"/ready", nil) | ||
| require.NoError(t, err) | ||
| httpResp, err := http.DefaultClient.Do(req) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 200, httpResp.StatusCode) | ||
|
|
||
| // Verify tempo_live_store_ready metric is 1 | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.Equals(1), "tempo_live_store_ready")) | ||
| }) | ||
| } | ||
|
|
||
| // TestLiveStoreReadinessRestartWithLag verifies restart scenario with accumulated Kafka lag | ||
| func TestLiveStoreReadinessRestartWithLag(t *testing.T) { | ||
| util.RunIntegrationTests(t, util.TestHarnessConfig{ | ||
| ConfigOverlay: "config-livestore-readiness-enabled.yaml", | ||
| Components: util.ComponentsRecentDataQuerying, | ||
| }, func(h *util.TempoHarness) { | ||
| liveStoreA := h.Services[util.ServiceLiveStoreZoneA] | ||
| liveStoreB := h.Services[util.ServiceLiveStoreZoneB] | ||
|
|
||
| // Stop liveStoreB to simplify the test | ||
| require.NoError(t, liveStoreB.Stop()) | ||
|
|
||
| // Wait for initial readiness | ||
| h.WaitTracesWritable(t) | ||
|
|
||
| // Write initial traces | ||
| for i := 0; i < 3; i++ { | ||
| info := tempoUtil.NewTraceInfo(time.Now(), "") | ||
| require.NoError(t, h.WriteTraceInfo(info, "")) | ||
| } | ||
|
|
||
| // Wait for traces to be processed | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.GreaterOrEqual(3), "tempo_live_store_traces_created_total")) | ||
|
|
||
| // Verify ready state before restart | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.Equals(1), "tempo_live_store_ready")) | ||
|
|
||
| // Stop LiveStore | ||
| require.NoError(t, liveStoreA.Stop()) | ||
|
|
||
| // Write traces during downtime to accumulate lag | ||
| for i := 0; i < 10; i++ { | ||
| require.NoError(t, h.WriteTraceInfo(tempoUtil.NewTraceInfo(time.Now(), ""), "")) | ||
| time.Sleep(100 * time.Millisecond) | ||
| } | ||
|
|
||
| // Restart LiveStore | ||
| require.NoError(t, liveStoreA.Start(h.TestScenario.NetworkName(), h.TestScenario.SharedDir())) | ||
|
|
||
| // Initially, LiveStore should not be ready (503) while catching up | ||
| // Note: This check is timing-sensitive and might pass if catch-up is very fast | ||
| req, err := http.NewRequest("GET", "http://"+liveStoreA.Endpoint(3200)+"/ready", nil) | ||
| require.NoError(t, err) | ||
| httpResp, err := http.DefaultClient.Do(req) | ||
| require.NoError(t, err) | ||
| // During catch-up, we might see 503 | ||
| if httpResp.StatusCode == 503 { | ||
| t.Log("LiveStore correctly returns 503 during catch-up") | ||
| } | ||
|
|
||
| // Wait for it to become ready after catching up | ||
| require.NoError(t, liveStoreA.WaitReady()) | ||
|
|
||
| // Verify /ready endpoint returns 200 | ||
| req, err = http.NewRequest("GET", "http://"+liveStoreA.Endpoint(3200)+"/ready", nil) | ||
| require.NoError(t, err) | ||
| httpResp, err = http.DefaultClient.Do(req) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 200, httpResp.StatusCode) | ||
|
|
||
| // Verify tempo_live_store_ready metric is 1 | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.Equals(1), "tempo_live_store_ready")) | ||
|
|
||
| // Verify some traces have been processed | ||
| require.NoError(t, liveStoreA.WaitSumMetrics(e2e.GreaterOrEqual(1), "tempo_live_store_traces_created_total")) | ||
|
|
||
| // Verify catch_up_duration metric was recorded | ||
| metrics, err := liveStoreA.SumMetrics([]string{"tempo_live_store_catch_up_duration_seconds"}) | ||
| require.NoError(t, err) | ||
| require.Greater(t, metrics[0], 0.0, "catch_up_duration should have been recorded") | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,16 @@ type Config struct { | |
| // Block configuration | ||
| BlockConfig common.BlockConfig `yaml:"block_config"` | ||
|
|
||
| // ReadinessTargetLag is the target consumer lag threshold before the live-store | ||
| // is considered ready to serve queries. The live-store will wait until lag drops | ||
| // below this value. Set to 0 to disable readiness waiting (default, backward compatible). | ||
| ReadinessTargetLag time.Duration `yaml:"readiness_target_lag"` | ||
|
|
||
| // ReadinessMaxWait is the maximum time to wait for catching up at startup. | ||
| // If this timeout is exceeded, the live-store becomes ready anyway. | ||
| // Only used if ReadinessTargetLag > 0. Default: 30m. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this creates a read outage if both zones have lag?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should not. The wait is bounded by EDIT: also, this behavior is only triggered at start - before live-store is marked |
||
| ReadinessMaxWait time.Duration `yaml:"readiness_max_wait"` | ||
|
|
||
| // testing config | ||
| holdAllBackgroundProcesses bool `yaml:"-"` // if this is set to true, the live store will never release its background processes | ||
| } | ||
|
|
@@ -83,13 +93,19 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) | |
|
|
||
| cfg.CommitInterval = 5 * time.Second | ||
|
|
||
| // Readiness config - default to disabled (backward compatible) | ||
| cfg.ReadinessTargetLag = 0 | ||
| cfg.ReadinessMaxWait = 30 * time.Minute | ||
|
|
||
| // Initialize block config with defaults | ||
| cfg.BlockConfig.RegisterFlagsAndApplyDefaults(prefix+".block", f) | ||
|
|
||
| // Register flags for existing fields | ||
| f.DurationVar(&cfg.CompleteBlockTimeout, prefix+".complete-block-timeout", cfg.CompleteBlockTimeout, "Duration to keep blocks in the live store after they have been flushed.") | ||
| f.UintVar(&cfg.QueryBlockConcurrency, prefix+".concurrent-blocks", cfg.QueryBlockConcurrency, "Number of concurrent blocks to query for metrics.") | ||
| f.Float64Var(&cfg.Metrics.TimeOverlapCutoff, prefix+".metrics.time-overlap-cutoff", cfg.Metrics.TimeOverlapCutoff, "Time overlap cutoff ratio for metrics queries (0.0-1.0).") | ||
| f.DurationVar(&cfg.ReadinessTargetLag, prefix+".readiness-target-lag", cfg.ReadinessTargetLag, "Target lag threshold before live-store is ready. 0 disables waiting (backward compatible).") | ||
| f.DurationVar(&cfg.ReadinessMaxWait, prefix+".readiness-max-wait", cfg.ReadinessMaxWait, "Maximum time to wait for catching up at startup. Only used if readiness-target-lag > 0.") | ||
|
|
||
| cfg.WAL.RegisterFlags(f) // WAL config has no flags, only defaults | ||
| cfg.WAL.Version = encoding.DefaultEncoding().Version() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotations don't catch e2e tests yet