Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [ENHANCEMENT] Add metric to track livestore block cut reasons [#6922](https://github.com/grafana/tempo/pull/6922) (@zhxiaogg)
* [ENHANCEMENT] Enable async parquet read mode for WAL completion path [#6967](https://github.com/grafana/tempo/pull/6967) (@zhxiaogg)
* [ENHANCEMENT] metrics-generator: add `leave_consumer_group_on_shutdown` to send LeaveGroup on shutdown for immediate partition reassignment instead of waiting for session timeout [#6575](https://github.com/grafana/tempo/pull/6575) (@zalegrala)
* [ENHANCEMENT] Send stale marker to remote write when series expire [#7056](https://github.com/grafana/tempo/pull/7056) (@LucasMRC)
* [BUGFIX] livestore: check readiness before lag for SearchRecent and QueryRange queries [#6911](https://github.com/grafana/tempo/pull/6911) (@zhxiaogg)
* [BUGFIX] Fix integer overflow in query parameters by using `strconv.ParseUint` instead of `strconv.Atoi`/`strconv.ParseInt` for unsigned integer fields. [#6612](https://github.com/grafana/tempo/pull/6612) (@bejaratommy)
* [BUGFIX] Fix live-store SearchTagValuesV2 disk cache never being populated on complete blocks [#6858](https://github.com/grafana/tempo/pull/6858) (@mapno)
Expand Down
14 changes: 14 additions & 0 deletions modules/generator/registry/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package registry

import (
"context"
"errors"
"fmt"

"github.com/prometheus/prometheus/model/exemplar"
Expand Down Expand Up @@ -58,6 +59,19 @@ func (n noopAppender) AppendHistogramSTZeroSample(_ storage.SeriesRef, _ labels.
return 0, nil
}

type errorAppender struct {
noopAppender
}

func (n errorAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, errors.New("append error")
}

var (
_ storage.Appendable = (*errorAppender)(nil)
_ storage.Appender = (*errorAppender)(nil)
)

type capturingAppender struct {
samples []sample
histograms []histogramSample
Expand Down
18 changes: 17 additions & 1 deletion modules/generator/registry/counter.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package registry

import (
"math"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"go.uber.org/multierr"
)

type counter struct {
Expand Down Expand Up @@ -171,15 +174,28 @@ func (c *counter) countSeriesDemand() int {
return int(c.seriesDemand.Estimate())
}

func (c *counter) removeStaleSeries(staleTimeMs int64) {
func (c *counter) removeStaleSeries(appender storage.Appender, timeMs, staleTimeMs int64) error {
c.seriesMtx.Lock()
defer c.seriesMtx.Unlock()

errs := []error{}

for hash, s := range c.series {
if s.lastUpdated.Load() < staleTimeMs {
if appender != nil {
_, err := appender.Append(0, s.labels, timeMs, math.Float64frombits(value.StaleNaN))
if err != nil {
errs = append(errs, err)
}
Comment on lines +185 to +189
Copy link
Copy Markdown
Author

@LucasMRC LucasMRC Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure on how to handle errors in the updated removeStaleSeries functions, I copied the pattern that I saw in modules/generator/config.go.

}
delete(c.series, hash)
c.lifecycler.OnDelete(hash, 1)
}
}
c.seriesDemand.Advance()

if len(errs) > 0 {
return multierr.Combine(errs...)
}
return nil
}
33 changes: 29 additions & 4 deletions modules/generator/registry/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func Test_counter_removeStaleSeries(t *testing.T) {
c.Inc(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

c.removeStaleSeries(timeMs)
appender := noopAppender{}
_ = c.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 0, removedSeries)

Expand All @@ -166,7 +167,7 @@ func Test_counter_removeStaleSeries(t *testing.T) {
// update value-2 series
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

c.removeStaleSeries(timeMs)
_ = c.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 1, removedSeries)

Expand All @@ -177,6 +178,28 @@ func Test_counter_removeStaleSeries(t *testing.T) {
collectMetricAndAssert(t, c, collectionTimeMs, 1, expectedSamples, nil)
}

func Test_counter_removeStaleSeries_appenderError(t *testing.T) {
var removedSeries int
lifecycler := &mockLimiter{
onDeleteFunc: func(_ uint64, count uint32) {
assert.Equal(t, uint32(1), count)
removedSeries++
},
}

c := newCounter("my_counter", lifecycler, map[string]string{}, 15*time.Minute)

c.Inc(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

appender := errorAppender{}
timeMs := time.Now().Add(1 * time.Hour).UnixMilli()
err := c.removeStaleSeries(appender, 0, timeMs)

assert.Error(t, err)
assert.Equal(t, removedSeries, 2)
}

func Test_counter_externalLabels(t *testing.T) {
c := newCounter("my_counter", noopLimiter, map[string]string{"external_label": "external_value"}, 15*time.Minute)

Expand Down Expand Up @@ -233,7 +256,8 @@ func Test_counter_concurrencyDataRace(t *testing.T) {
})

go accessor(func() {
c.removeStaleSeries(time.Now().UnixMilli())
appender := noopAppender{}
_ = c.removeStaleSeries(appender, 0, time.Now().UnixMilli())
})

time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -373,8 +397,9 @@ func Test_counter_demandDecay(t *testing.T) {
assert.Greater(t, initialDemand, 0)

// Advance the cardinality tracker enough times to clear the window
appender := noopAppender{}
for i := 0; i < 5; i++ {
c.removeStaleSeries(time.Now().UnixMilli())
_ = c.removeStaleSeries(appender, 0, time.Now().UnixMilli())
}

// Demand should have decreased or be zero
Expand Down
18 changes: 17 additions & 1 deletion modules/generator/registry/gauge.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package registry

import (
"math"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"go.uber.org/multierr"
)

var _ metric = (*gauge)(nil)
Expand Down Expand Up @@ -150,15 +153,28 @@ func (g *gauge) countSeriesDemand() int {
return int(g.seriesDemand.Estimate())
}

func (g *gauge) removeStaleSeries(staleTimeMs int64) {
func (g *gauge) removeStaleSeries(appender storage.Appender, timeMs, staleTimeMs int64) error {
g.seriesMtx.Lock()
defer g.seriesMtx.Unlock()

errs := []error{}

for hash, s := range g.series {
if s.lastUpdated.Load() < staleTimeMs {
if appender != nil {
_, err := appender.Append(0, s.labels, timeMs, math.Float64frombits(value.StaleNaN))
if err != nil {
errs = append(errs, err)
}
}
delete(g.series, hash)
g.lifecycler.OnDelete(hash, 1)
}
}
g.seriesDemand.Advance()

if len(errs) > 0 {
return multierr.Combine(errs...)
}
return nil
}
33 changes: 29 additions & 4 deletions modules/generator/registry/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func Test_gauge_removeStaleSeries(t *testing.T) {
c.Inc(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

c.removeStaleSeries(timeMs)
appender := noopAppender{}
_ = c.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 0, removedSeries)

Expand All @@ -186,7 +187,7 @@ func Test_gauge_removeStaleSeries(t *testing.T) {
// update value-2 series
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

c.removeStaleSeries(timeMs)
_ = c.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 1, removedSeries)

Expand All @@ -197,6 +198,28 @@ func Test_gauge_removeStaleSeries(t *testing.T) {
collectMetricAndAssert(t, c, collectionTimeMs, 1, expectedSamples, nil)
}

func Test_gauge_removeStaleSeries_appenderError(t *testing.T) {
var removedSeries int
lifecycler := &mockLimiter{
onDeleteFunc: func(_ uint64, count uint32) {
assert.Equal(t, uint32(1), count)
removedSeries++
},
}

c := newGauge("my_gauge", lifecycler, map[string]string{}, 15*time.Minute)

c.Inc(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.0)

appender := errorAppender{}
timeMs := time.Now().Add(1 * time.Hour).UnixMilli()
err := c.removeStaleSeries(appender, 0, timeMs)

assert.Error(t, err)
assert.Equal(t, removedSeries, 2)
}

func Test_gauge_externalLabels(t *testing.T) {
c := newGauge("my_gauge", noopLimiter, map[string]string{"external_label": "external_value"}, 15*time.Minute)

Expand Down Expand Up @@ -250,7 +273,8 @@ func Test_gauge_concurrencyDataRace(t *testing.T) {
})

go accessor(func() {
c.removeStaleSeries(time.Now().UnixMilli())
appender := noopAppender{}
_ = c.removeStaleSeries(appender, 0, time.Now().UnixMilli())
})

time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -369,8 +393,9 @@ func Test_gauge_demandDecay(t *testing.T) {
assert.Greater(t, initialDemand, 0)

// Advance the cardinality tracker enough times to clear the window
appender := noopAppender{}
for i := 0; i < 5; i++ {
g.removeStaleSeries(time.Now().Add(time.Hour).UnixMilli())
_ = g.removeStaleSeries(appender, 0, time.Now().Add(time.Hour).UnixMilli())
}

// Demand should have decreased or be zero
Expand Down
26 changes: 25 additions & 1 deletion modules/generator/registry/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"go.uber.org/multierr"
)

var _ metric = (*histogram)(nil)
Expand Down Expand Up @@ -258,17 +260,39 @@ func (h *histogram) countSeriesDemand() int {
return int(est) * int(h.activeSeriesPerHistogramSerie())
}

func (h *histogram) removeStaleSeries(staleTimeMs int64) {
func (h *histogram) removeStaleSeries(appender storage.Appender, timeMs, staleTimeMs int64) error {
h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

errs := []error{}
for hash, s := range h.series {
if s.lastUpdated.Load() < staleTimeMs {
if appender != nil {
_, err := appender.Append(0, s.countLabels, timeMs, math.Float64frombits(value.StaleNaN))
if err != nil {
errs = append(errs, err)
}
_, err = appender.Append(0, s.sumLabels, timeMs, math.Float64frombits(value.StaleNaN))
if err != nil {
errs = append(errs, err)
}
for _, l := range s.bucketLabels {
_, err = appender.Append(0, l, timeMs, math.Float64frombits(value.StaleNaN))
if err != nil {
errs = append(errs, err)
}
}
}
delete(h.series, hash)
h.lifecycler.OnDelete(hash, h.activeSeriesPerHistogramSerie())
}
}
h.seriesDemand.Advance()

if len(errs) > 0 {
return multierr.Combine(errs...)
}
return nil
}

func (h *histogram) activeSeriesPerHistogramSerie() uint32 {
Expand Down
30 changes: 27 additions & 3 deletions modules/generator/registry/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func Test_histogram_removeStaleSeries(t *testing.T) {
h.ObserveWithExemplar(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0, "", 1.0)
h.ObserveWithExemplar(buildTestLabels([]string{"label"}, []string{"value-2"}), 1.5, "", 1.0)

h.removeStaleSeries(timeMs)
appender := noopAppender{}
_ = h.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 0, removedSeries)

Expand Down Expand Up @@ -278,7 +279,7 @@ func Test_histogram_removeStaleSeries(t *testing.T) {
// update value-2 series
h.ObserveWithExemplar(buildTestLabels([]string{"label"}, []string{"value-2"}), 2.5, "", 1.0)

h.removeStaleSeries(timeMs)
_ = h.removeStaleSeries(appender, 0, timeMs)

assert.Equal(t, 1, removedSeries)

Expand All @@ -293,6 +294,28 @@ func Test_histogram_removeStaleSeries(t *testing.T) {
collectMetricAndAssert(t, h, collectionTimeMs, 5, expectedSamples, nil)
}

func Test_histogram_removeStaleSeries_appenderError(t *testing.T) {
var removedSeries int
lifecycler := &mockLimiter{
onDeleteFunc: func(_ uint64, count uint32) {
assert.Equal(t, uint32(5), count)
removedSeries++
},
}

h := newHistogram("my_histogram", []float64{1.0, 2.0}, lifecycler, "", nil, 15*time.Minute)

timeMs := time.Now().Add(1 * time.Hour).UnixMilli()
h.ObserveWithExemplar(buildTestLabels([]string{"label"}, []string{"value-1"}), 1.0, "", 1.0)
h.ObserveWithExemplar(buildTestLabels([]string{"label"}, []string{"value-2"}), 1.5, "", 1.0)

appender := errorAppender{}
err := h.removeStaleSeries(appender, 0, timeMs)

assert.Error(t, err)
assert.Equal(t, removedSeries, 2)
}

func Test_histogram_externalLabels(t *testing.T) {
extLabels := map[string]string{"external_label": "external_value"}

Expand Down Expand Up @@ -365,7 +388,8 @@ func Test_histogram_concurrencyDataRace(t *testing.T) {
})

go accessor(func() {
h.removeStaleSeries(time.Now().UnixMilli())
appender := noopAppender{}
_ = h.removeStaleSeries(appender, 0, time.Now().UnixMilli())
})

time.Sleep(200 * time.Millisecond)
Expand Down
Loading
Loading