Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Ability to toggle off latency or count metrics in metrics-generator [#2070](https://github.com/grafana/tempo/pull/2070) (@AlexDHoffer)
Comment thread
AlexDCraig marked this conversation as resolved.
* [ENHANCEMENT] Extend `/flush` to support flushing a single tenant [#2260](https://github.com/grafana/tempo/pull/2260) (@kvrhdn)

## v2.1.0-rc.0 / 2023-04-12
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/azure/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ storage:
storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # Default azurite sorage key

overrides:
metrics_generator_processors: [service-graphs, span-metrics]
metrics_generator_processors: [service-graphs, span-metrics]
47 changes: 47 additions & 0 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,60 @@ func (i *instance) watchOverrides() {
}
}

// Look at the processors defined and see if any are actually span-metrics subprocessors
// If they are, set the appropriate flags in the spanmetrics struct
func (i *instance) updateSubprocessors(desiredProcessors map[string]struct{}, desiredCfg ProcessorConfig) (map[string]struct{}, ProcessorConfig) {
desiredProcessorsFound := false
for d := range desiredProcessors {
if (d == spanmetrics.Name) || (spanmetrics.ParseSubprocessor(d)) {
desiredProcessorsFound = true
}
}

if !desiredProcessorsFound {
return desiredProcessors, desiredCfg
}

_, allOk := desiredProcessors[spanmetrics.Name]
_, countOk := desiredProcessors[spanmetrics.Count.String()]
_, latencyOk := desiredProcessors[spanmetrics.Latency.String()]
_, sizeOk := desiredProcessors[spanmetrics.Size.String()]

if !allOk {
desiredProcessors[spanmetrics.Name] = struct{}{}
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Count] = false
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Latency] = false
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Size] = false
desiredCfg.SpanMetrics.HistogramBuckets = nil

if countOk {
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Count] = true
}
if latencyOk {
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Latency] = true
desiredCfg.SpanMetrics.HistogramBuckets = prometheus.ExponentialBuckets(0.002, 2, 14)
}
if sizeOk {
desiredCfg.SpanMetrics.Subprocessors[spanmetrics.Size] = true
}
}

delete(desiredProcessors, spanmetrics.Latency.String())
delete(desiredProcessors, spanmetrics.Count.String())
delete(desiredProcessors, spanmetrics.Size.String())

return desiredProcessors, desiredCfg
}

func (i *instance) updateProcessors() error {
desiredProcessors := i.overrides.MetricsGeneratorProcessors(i.instanceID)
desiredCfg, err := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID)
if err != nil {
return err
}

desiredProcessors, desiredCfg = i.updateSubprocessors(desiredProcessors, desiredCfg)

i.processorsMtx.RLock()
toAdd, toRemove, toReplace, err := i.diffProcessors(desiredProcessors, desiredCfg)
i.processorsMtx.RUnlock()
Expand Down
126 changes: 126 additions & 0 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"os"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -146,6 +147,131 @@ func Test_instance_updateProcessors(t *testing.T) {

assert.Len(t, instance.processors, 0)
})

t.Run("add span-latency subprocessor", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
spanmetrics.Latency.String(): {},
}
err := instance.updateProcessors()
assert.NoError(t, err)

var expectedConfig spanmetrics.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}
expectedConfig.IntrinsicDimensions.StatusMessage = true
expectedConfig.HistogramBuckets = prometheus.ExponentialBuckets(0.002, 2, 14)
expectedConfig.Subprocessors[spanmetrics.Latency] = true
expectedConfig.Subprocessors[spanmetrics.Count] = false
expectedConfig.Subprocessors[spanmetrics.Size] = false

assert.Equal(t, expectedConfig, instance.processors[spanmetrics.Name].(*spanmetrics.Processor).Cfg)

var expectedProcessors = []string{servicegraphs.Name, spanmetrics.Name}
actualProcessors := make([]string, 0, len(instance.processors))

for name := range instance.processors {
actualProcessors = append(actualProcessors, name)
}

sort.Strings(actualProcessors)

assert.Equal(t, expectedProcessors, actualProcessors)
})

t.Run("replace span-latency subprocessor with span-count", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
spanmetrics.Count.String(): {},
}
err := instance.updateProcessors()
assert.NoError(t, err)

var expectedConfig spanmetrics.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}
expectedConfig.IntrinsicDimensions.StatusMessage = true
expectedConfig.HistogramBuckets = nil
expectedConfig.Subprocessors[spanmetrics.Latency] = false
expectedConfig.Subprocessors[spanmetrics.Count] = true
expectedConfig.Subprocessors[spanmetrics.Size] = false

assert.Equal(t, expectedConfig, instance.processors[spanmetrics.Name].(*spanmetrics.Processor).Cfg)

var expectedProcessors = []string{servicegraphs.Name, spanmetrics.Name}
actualProcessors := make([]string, 0, len(instance.processors))

for name := range instance.processors {
actualProcessors = append(actualProcessors, name)
}

sort.Strings(actualProcessors)

assert.Equal(t, expectedProcessors, actualProcessors)
})

t.Run("use all three subprocessors at once", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
spanmetrics.Count.String(): {},
spanmetrics.Latency.String(): {},
spanmetrics.Size.String(): {},
}
err := instance.updateProcessors()
assert.NoError(t, err)

var expectedConfig spanmetrics.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}
expectedConfig.IntrinsicDimensions.StatusMessage = true
expectedConfig.HistogramBuckets = prometheus.ExponentialBuckets(0.002, 2, 14)
expectedConfig.Subprocessors[spanmetrics.Latency] = true
expectedConfig.Subprocessors[spanmetrics.Count] = true
expectedConfig.Subprocessors[spanmetrics.Size] = true

assert.Equal(t, expectedConfig, instance.processors[spanmetrics.Name].(*spanmetrics.Processor).Cfg)

var expectedProcessors = []string{servicegraphs.Name, spanmetrics.Name}
actualProcessors := make([]string, 0, len(instance.processors))

for name := range instance.processors {
actualProcessors = append(actualProcessors, name)
}

sort.Strings(actualProcessors)

assert.Equal(t, expectedProcessors, actualProcessors)
})

t.Run("replace subprocessors with span-metrics processor", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
spanmetrics.Name: {},
}
err := instance.updateProcessors()
assert.NoError(t, err)

var expectedConfig spanmetrics.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}
expectedConfig.IntrinsicDimensions.StatusMessage = true
expectedConfig.HistogramBuckets = prometheus.ExponentialBuckets(0.002, 2, 14)
expectedConfig.Subprocessors[spanmetrics.Latency] = true
expectedConfig.Subprocessors[spanmetrics.Count] = true

assert.Equal(t, expectedConfig, instance.processors[spanmetrics.Name].(*spanmetrics.Processor).Cfg)

var expectedProcessors = []string{servicegraphs.Name, spanmetrics.Name}
actualProcessors := make([]string, 0, len(instance.processors))

for name := range instance.processors {
actualProcessors = append(actualProcessors, name)
}

sort.Strings(actualProcessors)

assert.Equal(t, expectedProcessors, actualProcessors)
})
}

type noopStorage struct{}
Expand Down
8 changes: 8 additions & 0 deletions modules/generator/processor/spanmetrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Config struct {

// If enabled attribute value will be used for metric calculation
SpanMultiplierKey string `yaml:"span_multiplier_key"`

// Subprocessor options for this Processor include Latency, Count, Size
// These are metrics categories that exist under the umbrella of Span Metrics
Subprocessors map[Subprocessor]bool
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -38,6 +42,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.IntrinsicDimensions.SpanName = true
cfg.IntrinsicDimensions.SpanKind = true
cfg.IntrinsicDimensions.StatusCode = true
cfg.Subprocessors = make(map[Subprocessor]bool)
cfg.Subprocessors[Latency] = true
cfg.Subprocessors[Count] = true
cfg.Subprocessors[Size] = true
}

type IntrinsicDimensions struct {
Expand Down
34 changes: 25 additions & 9 deletions modules/generator/processor/spanmetrics/spanmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,20 @@ func New(cfg Config, registry registry.Registry) gen.Processor {
labels = append(labels, sanitizeLabelNameWithCollisions(d))
}

return &Processor{
Cfg: cfg,
registry: registry,
spanMetricsCallsTotal: registry.NewCounter(metricCallsTotal, labels),
spanMetricsDurationSeconds: registry.NewHistogram(metricDurationSeconds, labels, cfg.HistogramBuckets),
spanMetricsSizeTotal: registry.NewCounter(metricSizeTotal, labels),
now: time.Now,
p := &Processor{}
if cfg.Subprocessors[Latency] {
p.spanMetricsDurationSeconds = registry.NewHistogram(metricDurationSeconds, labels, cfg.HistogramBuckets)
}
if cfg.Subprocessors[Count] {
p.spanMetricsCallsTotal = registry.NewCounter(metricCallsTotal, labels)
}
if cfg.Subprocessors[Size] {
p.spanMetricsSizeTotal = registry.NewCounter(metricSizeTotal, labels)
}
p.Cfg = cfg
p.registry = registry
p.now = time.Now
return p
}

func (p *Processor) Name() string {
Expand Down Expand Up @@ -123,9 +129,19 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, rs *v1.Resource, spa

registryLabelValues := p.registry.NewLabelValues(labelValues)

p.spanMetricsCallsTotal.Inc(registryLabelValues, 1*spanMultiplier)
if p.Cfg.Subprocessors[Count] {
p.spanMetricsCallsTotal.Inc(registryLabelValues, 1*spanMultiplier)
}

p.spanMetricsSizeTotal.Inc(registryLabelValues, float64(span.Size())*spanMultiplier)
p.spanMetricsDurationSeconds.ObserveWithExemplar(registryLabelValues, latencySeconds, tempo_util.TraceIDToHexString(span.TraceId), spanMultiplier)

if p.Cfg.Subprocessors[Latency] {
p.spanMetricsDurationSeconds.ObserveWithExemplar(registryLabelValues, latencySeconds, tempo_util.TraceIDToHexString(span.TraceId), spanMultiplier)
}

if p.Cfg.Subprocessors[Size] {
p.spanMetricsSizeTotal.Inc(registryLabelValues, float64(span.Size()))
}
}

func sanitizeLabelNameWithCollisions(name string) string {
Expand Down
41 changes: 41 additions & 0 deletions modules/generator/processor/spanmetrics/subprocessors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package spanmetrics

import (
"strings"
)

type Subprocessor int

const (
Latency Subprocessor = iota
Count
Size
)

var SupportedSubprocessors = []Subprocessor{
Latency,
Count,
Size,
}

func (s Subprocessor) String() string {
switch s {
case Latency:
return "span-metrics-latency"
case Count:
return "span-metrics-count"
case Size:
return "span-metrics-size"
default:
return "unsupported"
}
}

func ParseSubprocessor(s string) bool {
for _, p := range SupportedSubprocessors {
if strings.EqualFold(p.String(), s) {
return true
}
}
return false
}