Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/cache/cache_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type PodCache interface {
// GetPod retrieves a Pod object by name
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// Returns:
// *v1.Pod: Found pod object
// error: Error information if operation fails
GetPod(podName string) (*v1.Pod, error)
GetPod(podName, podNamespace string) (*v1.Pod, error)

// ListPodsByModel gets pods associated with a model
// Parameters:
Expand Down Expand Up @@ -66,33 +67,36 @@ type ModelCache interface {
// ListModelsByPod gets models associated with a pod
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// Returns:
// map[string]struct{}: Set of model names
// error: Error information if operation fails
ListModelsByPod(podName string) ([]string, error)
ListModelsByPod(podName, podNamespace string) ([]string, error)
}

// MetricCache defines operations for metric data caching
type MetricCache interface {
// GetMetricValueByPod gets metric value for a pod
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// metricName: Name of the metric
// Returns:
// metrics.MetricValue: Retrieved metric value
// error: Error information if operation fails
GetMetricValueByPod(podName, metricName string) (metrics.MetricValue, error)
GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)

// GetMetricValueByPodModel gets metric value for pod-model pair
// Parameters:
// ctx: Routing context
// podName: Name of the pod
// podNamespace: Namespace of the pod
// modelName: Name of the model
// metricName: Name of the metric
// Returns:
// metrics.MetricValue: Retrieved metric value
// error: Error information if operation fails
GetMetricValueByPodModel(podName, modelName string, metricName string) (metrics.MetricValue, error)
GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)

// AddSubscriber adds a metric subscriber
// Parameters:
Expand Down
34 changes: 22 additions & 12 deletions pkg/cache/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/vllm-project/aibrix/pkg/metrics"
"github.com/vllm-project/aibrix/pkg/types"
"github.com/vllm-project/aibrix/pkg/utils"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
Expand All @@ -30,15 +32,17 @@ import (
// Parameters:
//
// podName: Name of the pod to retrieve
// podNamespace: Namespace of the pod to retrieve
//
// Returns:
//
// *v1.Pod: The found Pod object
// error: Error if pod doesn't exist
func (c *Store) GetPod(podName string) (*v1.Pod, error) {
metaPod, ok := c.metaPods.Load(podName)
func (c *Store) GetPod(podName, podNamespace string) (*v1.Pod, error) {
key := utils.GeneratePodKey(podNamespace, podName)
metaPod, ok := c.metaPods.Load(key)
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
return nil, fmt.Errorf("key does not exist in the cache: %s", key)
}

return metaPod.Pod, nil
Expand Down Expand Up @@ -102,15 +106,17 @@ func (c *Store) HasModel(modelName string) bool {
// Parameters:
//
// podName: Name of the Pod to query
// podNamespace: Namespace of the Pod to query
//
// Returns:
//
// []string: Slice of model names
// error: Error if Pod doesn't exist
func (c *Store) ListModelsByPod(podName string) ([]string, error) {
metaPod, ok := c.metaPods.Load(podName)
func (c *Store) ListModelsByPod(podName, podNamespace string) ([]string, error) {
key := utils.GeneratePodKey(podNamespace, podName)
metaPod, ok := c.metaPods.Load(key)
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
return nil, fmt.Errorf("key does not exist in the cache: %s", key)
}

return metaPod.Models.Array(), nil
Expand All @@ -120,16 +126,18 @@ func (c *Store) ListModelsByPod(podName string) ([]string, error) {
// Parameters:
//
// podName: Name of the Pod
// podNamespace: Namespace of the Pod
// metricName: Name of the metric
//
// Returns:
//
// metrics.MetricValue: The metric value
// error: Error if Pod or metric doesn't exist
func (c *Store) GetMetricValueByPod(podName, metricName string) (metrics.MetricValue, error) {
metaPod, ok := c.metaPods.Load(podName)
func (c *Store) GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error) {
key := utils.GeneratePodKey(podNamespace, podName)
metaPod, ok := c.metaPods.Load(key)
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
return nil, fmt.Errorf("key does not exist in the cache: %s", key)
}

return c.getPodMetricImpl(podName, &metaPod.Metrics, metricName)
Expand All @@ -139,17 +147,19 @@ func (c *Store) GetMetricValueByPod(podName, metricName string) (metrics.MetricV
// Parameters:
//
// podName: Name of the Pod
// podNamespace: Namespace of the Pod
// modelName: Name of the model
// metricName: Name of the metric
//
// Returns:
//
// metrics.MetricValue: The metric value
// error: Error if Pod, model or metric doesn't exist
func (c *Store) GetMetricValueByPodModel(podName, modelName string, metricName string) (metrics.MetricValue, error) {
metaPod, ok := c.metaPods.Load(podName)
func (c *Store) GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error) {
key := utils.GeneratePodKey(podNamespace, podName)
metaPod, ok := c.metaPods.Load(key)
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
return nil, fmt.Errorf("key does not exist in the cache: %s", key)
}

return c.getPodMetricImpl(podName, &metaPod.ModelMetrics, c.getPodModelMetricName(modelName, metricName))
Expand Down
8 changes: 6 additions & 2 deletions pkg/cache/cache_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Store struct {
numRequestsTraces int32 // Request trace counter

// Pod related storage
metaPods utils.SyncMap[string, *Pod] // pod_name -> *Pod
metaPods utils.SyncMap[string, *Pod] // pod_namespace/pod_name -> *Pod

// Mapping relationships
metaModels utils.SyncMap[string, *Model] // model_name -> *Model
Expand Down Expand Up @@ -103,7 +103,11 @@ func NewTestCacheWithPods(pods []*v1.Pod, model string) *Store {

func NewTestCacheWithPodsMetrics(pods []*v1.Pod, model string, podMetrics map[string]map[string]metrics.MetricValue) *Store {
c := NewTestCacheWithPods(pods, model)
c.metaPods.Range(func(podName string, metaPod *Pod) bool {
c.metaPods.Range(func(key string, metaPod *Pod) bool {
_, podName, ok := utils.ParsePodKey(key)
if !ok {
return true
}
if podmetrics, ok := podMetrics[podName]; ok {
for metricName, metric := range podmetrics {
if err := c.updatePodRecord(metaPod, model, metricName, metrics.PodMetricScope, metric); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/cache/cache_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/vllm-project/aibrix/pkg/metrics"
"github.com/vllm-project/aibrix/pkg/utils"
"k8s.io/klog/v2"
)

Expand All @@ -28,7 +29,11 @@ func (c *Store) debugInfo() {
return
}

c.metaPods.Range(func(podName string, pod *Pod) bool {
c.metaPods.Range(func(key string, pod *Pod) bool {
_, podName, ok := utils.ParsePodKey(key)
if !ok {
return true
}
klog.V(4).Infof("pod: %s, podIP: %v, models: %s", podName, pod.Status.PodIP, strings.Join(pod.Models.Array(), " "))
pod.Metrics.Range(func(metricName string, metricVal metrics.MetricValue) bool {
klog.V(5).Infof("%v_%v_%v", podName, metricName, metricVal)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/cache_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *Store) getPodModelMetricName(modelName string, metricName string) strin
}

func (c *Store) updatePodMetrics() {
c.metaPods.Range(func(podName string, metaPod *Pod) bool {
c.metaPods.Range(func(key string, metaPod *Pod) bool {
if !utils.FilterReadyPod(metaPod.Pod) {
// Skip unready pod
return true
Expand Down
Loading
Loading