Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/spdystream v0.4.0 // indirect
Expand Down
136 changes: 136 additions & 0 deletions pkg/metrics/custom_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2025 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
customGauges = make(map[string]*prometheus.GaugeVec)
customGaugesMu sync.RWMutex
customCounters = make(map[string]*prometheus.CounterVec)
customCountersMu sync.RWMutex

// Function variables that can be overridden for testing
SetGaugeMetricFnForTest = defaultSetGaugeMetric
IncrementCounterMetricFnForTest = defaultIncrementCounterMetric
)

func SetGaugeMetric(name string, help string, value float64, labelNames []string, labelValues ...string) {
SetGaugeMetricFnForTest(name, help, value, labelNames, labelValues...)
}

func defaultSetGaugeMetric(name string, help string, value float64, labelNames []string, labelValues ...string) {
customGaugesMu.RLock()
gauge, ok := customGauges[name]
customGaugesMu.RUnlock()

if !ok {
customGaugesMu.Lock()
gauge, ok = customGauges[name]
if !ok {
gauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: name, Help: help},
labelNames,
)
customGauges[name] = gauge
}
customGaugesMu.Unlock()
}

gauge.WithLabelValues(labelValues...).Set(value)
}

func IncrementCounterMetric(name string, help string, value float64, labelNames []string, labelValues ...string) {
IncrementCounterMetricFnForTest(name, help, value, labelNames, labelValues...)
}

func defaultIncrementCounterMetric(name string, help string, value float64, labelNames []string, labelValues ...string) {
customCountersMu.RLock()
counter, ok := customCounters[name]
customCountersMu.RUnlock()

if !ok {
customCountersMu.Lock()
counter, ok = customCounters[name]
if !ok {
counter = promauto.NewCounterVec(
prometheus.CounterOpts{Name: name, Help: help},
labelNames,
)
customCounters[name] = counter
}
customCountersMu.Unlock()
}

counter.WithLabelValues(labelValues...).Add(value)
}

func GetMetricHelp(metricName string) string {
metric, ok := Metrics[metricName]
if !ok {
return ""
}
return metric.Description
}

func GetGaugeValueForTest(name string, labelValues ...string) float64 {
customGaugesMu.RLock()
defer customGaugesMu.RUnlock()
// Tests should use their own registry
return 0
}

func SetupMetricsForTest(metricName string, labelNames []string) (*prometheus.GaugeVec, func()) {
testRegistry := prometheus.NewRegistry()
testGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: metricName},
labelNames,
)
testRegistry.MustRegister(testGauge)

originalFn := SetGaugeMetricFnForTest
SetGaugeMetricFnForTest = func(name string, help string, value float64, labels []string, labelValues ...string) {
if name == metricName {
testGauge.WithLabelValues(labelValues...).Set(value)
}
}

return testGauge, func() { SetGaugeMetricFnForTest = originalFn }
}

func SetupCounterMetricsForTest(metricName string, labelNames []string) (*prometheus.CounterVec, func()) {
testRegistry := prometheus.NewRegistry()
testCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{Name: metricName},
labelNames,
)
testRegistry.MustRegister(testCounter)

originalFn := IncrementCounterMetricFnForTest
IncrementCounterMetricFnForTest = func(name string, help string, value float64, labels []string, labelValues ...string) {
if name == metricName {
testCounter.WithLabelValues(labelValues...).Add(value)
}
}

return testCounter, func() { IncrementCounterMetricFnForTest = originalFn }
}
185 changes: 185 additions & 0 deletions pkg/metrics/custom_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
Copyright 2025 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"sync"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
)

const (
testMetricName = "test_metric"
testMetricHelp = "Test metric for unit tests"
testLabelName = "test_label"
testLabelValue = "test_value"
testLabelName2 = "test_label2"
testLabelValue2 = "test_value2"
)

func TestSetGaugeMetric(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()

registry := prometheus.NewRegistry()
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: testMetricName, Help: testMetricHelp},
[]string{testLabelName},
)
registry.MustRegister(gauge)

originalFn := SetGaugeMetricFnForTest
defer func() { SetGaugeMetricFnForTest = originalFn }()

SetGaugeMetricFnForTest = func(name string, help string, value float64, labelNames []string, labelValues ...string) {
if name == testMetricName {
gauge.WithLabelValues(labelValues...).Set(value)
}
}

testValue := 42.0
SetGaugeMetric(testMetricName, testMetricHelp, testValue, []string{testLabelName}, testLabelValue)

value := testutil.ToFloat64(gauge.WithLabelValues(testLabelValue))
assert.Equal(t, testValue, value, "Metric value should match what was set")
}

func TestSetupMetricsForTest(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()

testGauge, cleanup := SetupMetricsForTest(testMetricName, []string{testLabelName})
defer cleanup()

testValue := 42.0
SetGaugeMetric(testMetricName, testMetricHelp, testValue, []string{testLabelName}, testLabelValue)

value := testutil.ToFloat64(testGauge.WithLabelValues(testLabelValue))
assert.Equal(t, testValue, value, "Metric value should match what was set")

cleanup()
SetGaugeMetric(testMetricName, testMetricHelp, 99.0, []string{testLabelName}, testLabelValue)

value = testutil.ToFloat64(testGauge.WithLabelValues(testLabelValue))
assert.Equal(t, testValue, value, "Metric value should not change after cleanup")
}

func TestMetricRegistrationOnlyOnce(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
customGauges = make(map[string]*prometheus.GaugeVec)

for i := 0; i < 5; i++ {
SetGaugeMetric(testMetricName, testMetricHelp, float64(i), []string{testLabelName}, testLabelValue)
}

customGaugesMu.RLock()
count := len(customGauges)
customGaugesMu.RUnlock()

assert.Equal(t, 1, count, "Should only register the metric once")
}

func TestConcurrentMetricUpdates(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
customGauges = make(map[string]*prometheus.GaugeVec)

numGoroutines := 10
numUpdatesPerGoroutine := 100

var wg sync.WaitGroup
wg.Add(numGoroutines)

for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < numUpdatesPerGoroutine; j++ {
labelValue := testLabelValue + "_" + string(rune('A'+id))
SetGaugeMetric(testMetricName, testMetricHelp, float64(j), []string{testLabelName}, labelValue)
}
}(i)
}

wg.Wait()

customGaugesMu.RLock()
count := len(customGauges)
gauge := customGauges[testMetricName]
customGaugesMu.RUnlock()

assert.Equal(t, 1, count, "Should only register the metric once even with concurrent updates")

ch := make(chan *prometheus.Desc, 1)
gauge.Describe(ch)
desc := <-ch
assert.Contains(t, desc.String(), testMetricName, "Metric description should contain the metric name")
}

func TestMultipleLabels(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
customGauges = make(map[string]*prometheus.GaugeVec)

registry := prometheus.NewRegistry()
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: testMetricName, Help: testMetricHelp},
[]string{testLabelName, testLabelName2},
)
registry.MustRegister(gauge)

originalFn := SetGaugeMetricFnForTest
defer func() { SetGaugeMetricFnForTest = originalFn }()

SetGaugeMetricFnForTest = func(name string, help string, value float64, labelNames []string, labelValues ...string) {
if name == testMetricName {
gauge.WithLabelValues(labelValues...).Set(value)
}
}

testValue := 42.0
SetGaugeMetric(testMetricName, testMetricHelp, testValue,
[]string{testLabelName, testLabelName2},
testLabelValue, testLabelValue2)

value := testutil.ToFloat64(gauge.WithLabelValues(testLabelValue, testLabelValue2))
assert.Equal(t, testValue, value, "Metric value with multiple labels should match what was set")
}

func TestSetupCounterMetricsForTest(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
customCounters = make(map[string]*prometheus.CounterVec)

testCounter, cleanup := SetupCounterMetricsForTest("test_counter", []string{"pod", "model"})
defer cleanup()

IncrementCounterMetric("test_counter", "Test counter metric", 5.0, []string{"pod", "model"}, "pod-1", "model-1")

metricValue := testutil.ToFloat64(testCounter.WithLabelValues("pod-1", "model-1"))
assert.Equal(t, 5.0, metricValue, "Counter metric value should match the incremented value")

IncrementCounterMetric("test_counter", "Test counter metric", 3.0, []string{"pod", "model"}, "pod-1", "model-1")

metricValue = testutil.ToFloat64(testCounter.WithLabelValues("pod-1", "model-1"))
assert.Equal(t, 8.0, metricValue, "Counter metric value should accumulate")

IncrementCounterMetric("test_counter", "Test counter metric", 10.0, []string{"pod", "model"}, "pod-2", "model-1")

metricValue = testutil.ToFloat64(testCounter.WithLabelValues("pod-1", "model-1"))
assert.Equal(t, 8.0, metricValue, "Original counter metric should be unchanged")

metricValue = testutil.ToFloat64(testCounter.WithLabelValues("pod-2", "model-1"))
assert.Equal(t, 10.0, metricValue, "Counter metric with different labels should have correct value")
}
12 changes: 10 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const (
MaxLora = "max_lora"
WaitingLoraAdapters = "waiting_lora_adapters"
RunningLoraAdapters = "running_lora_adapters"
// Realtime metrics
RealtimeNumRequestsRunning = "realtime_num_requests_running"
VTCBucketSizeActive = "vtc_bucket_size_active"
RealtimeNumRequestsRunning = "realtime_num_requests_running"
)

var (
Expand Down Expand Up @@ -303,5 +303,13 @@ var (
RawMetricName: "lora_requests_info",
Description: "Count of waiting Lora Adapters",
},
VTCBucketSizeActive: {
MetricScope: PodModelMetricScope,
MetricSource: PodRawMetrics,
MetricType: MetricType{
Raw: Gauge,
},
Description: "Current adaptive bucket size used by VTC algorithm for token normalization",
},
}
)
11 changes: 10 additions & 1 deletion pkg/plugins/gateway/algorithms/vtc/vtc_basic.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024 The Aibrix Team.
Copyright 2025 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,6 +134,14 @@ func (r *BasicVTCRouter) Route(ctx *types.RoutingContext, readyPodList types.Pod
// relevant to the current system load while maintaining a minimum sensitivity
adaptiveBucketSize := math.Max(tokenTrackerMinTokens, (minTokens+maxTokens)/2)

metrics.SetGaugeMetric(
metrics.VTCBucketSizeActive,
metrics.GetMetricHelp(metrics.VTCBucketSizeActive),
adaptiveBucketSize,
[]string{"pod", "model"},
pod.Name, ctx.Model,
)

// Apply clamped linear mapping: tokens / bucket_size, clamped to [0, npods-1]
normalizedTokens := math.Min(float64(userTokens)/adaptiveBucketSize, float64(len(readyPods)-1))

Expand Down Expand Up @@ -213,5 +221,6 @@ func (r *BasicVTCRouter) Route(ctx *types.RoutingContext, readyPodList types.Pod
func (r *BasicVTCRouter) SubscribedMetrics() []string {
return []string{
metrics.NumRequestsRunning,
metrics.VTCBucketSizeActive,
}
}
Loading
Loading