Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/samples/autoscaling_v1alpha1_mock_llama.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ metadata:
labels:
app.kubernetes.io/name: aibrix
app.kubernetes.io/managed-by: kustomize
autoscaling.aibrix.ai/max-scale-up-rate: "2"
autoscaling.aibrix.ai/max-scale-down-rate: "2"
kpa.autoscaling.aibrix.ai/stable-window: "60s"
kpa.autoscaling.aibrix.ai/scale-down-delay: "60s"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to use other units like min etc? If not, should we alway use second so we do not need to parse the s there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to use other units like min etc? If not, should we alway use second so we do not need to parse the s there

I think we could remain the form of "30s" for some reason:

  • It refers to the KPA setting, like autoscaling.knative.dev/window: "40s" (doc).
  • Furthermore, Go has a built-in function which parses "30s" into the time.Duration. If we only keep 30, we still need to add * time.second manually.

namespace: aibrix-system
spec:
scaleTargetRef:
Expand Down
46 changes: 46 additions & 0 deletions pkg/controller/podautoscaler/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ limitations under the License.

package common

import (
"strconv"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
"k8s.io/klog/v2"
)

const (
AutoscalingLabelPrefix = "autoscaling.aibrix.ai/"
maxScaleUpRateLabel = AutoscalingLabelPrefix + "max-scale-up-rate"
maxScaleDownRateLabel = AutoscalingLabelPrefix + "max-scale-down-rate"
)

// ScalingContext defines the generalized common that holds all necessary data for scaling calculations.
type ScalingContext interface {
GetTargetValue() float64
Expand All @@ -24,6 +37,7 @@ type ScalingContext interface {
GetMaxScaleUpRate() float64
GetMaxScaleDownRate() float64
GetCurrentUsePerPod() float64
UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does UpdateByPaTypes mean? it tries to build the context from the object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does UpdateByPaTypes mean? it tries to build the context from the object?

Yes, when KpaAutoscaler or ApaAutoscaler invoke UpdateScalingContext(pa), they need to use pa object to update KpaScalingContext and ApaScalingContext correspondingly.

Another benefit is that KpaScalingContext and ApaScalingContext possess certain common parameters within BaseScalingContext. Consequently, they can call BaseScalingContext.UpdateByPaTypes to avoid the duplicate codes.

}

// BaseScalingContext provides a base implementation of the ScalingContext interface.
Expand All @@ -42,6 +56,8 @@ type BaseScalingContext struct {
currentUsePerPod float64
}

var _ ScalingContext = (*BaseScalingContext)(nil)

// NewBaseScalingContext creates a new instance of BaseScalingContext with default values.
func NewBaseScalingContext() *BaseScalingContext {
return &BaseScalingContext{
Expand All @@ -53,6 +69,36 @@ func NewBaseScalingContext() *BaseScalingContext {
}
}

// UpdateByPaTypes should be invoked in any scaling context that embeds BaseScalingContext.
func (b *BaseScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
b.ScalingMetric = pa.Spec.TargetMetric
// parse target value
targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64)
if err != nil {
klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue)
return err
}
b.TargetValue = targetValue

for key, value := range pa.Labels {
switch key {
case maxScaleUpRateLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
b.MaxScaleUpRate = v
case maxScaleDownRateLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
b.MaxScaleDownRate = v
}
}
return nil
}

func (b *BaseScalingContext) SetCurrentUsePerPod(value float64) {
b.currentUsePerPod = value
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,

err = r.updateScalerSpec(ctx, pa)
if err != nil {
klog.ErrorS(err, "Failed to update scaler spec from pa_types")
return 0, "", currentTimestamp, fmt.Errorf("error update scaler spec: %w", err)
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/podautoscaler/scaler/apa.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import (
"k8s.io/klog/v2"
)

const (
APALabelPrefix = "apa." + scalingcontext.AutoscalingLabelPrefix
upFluctuationToleranceLabel = APALabelPrefix + "up-fluctuation-tolerance"
downFluctuationToleranceLabel = APALabelPrefix + "down-fluctuation-tolerance"
)

// ApaScalingContext defines parameters for scaling decisions.
type ApaScalingContext struct {
scalingcontext.BaseScalingContext
Expand Down Expand Up @@ -85,6 +91,30 @@ func NewApaAutoscaler(readyPodsCount int, spec *ApaScalingContext) (*ApaAutoscal
}, nil
}

func (a *ApaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
err := a.BaseScalingContext.UpdateByPaTypes(pa)
if err != nil {
return err
}
for key, value := range pa.Labels {
switch key {
case upFluctuationToleranceLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
a.UpFluctuationTolerance = v
case downFluctuationToleranceLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
a.DownFluctuationTolerance = v
}
}
return nil
}

func (a *ApaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time) ScaleResult {
spec, ok := a.GetScalingContext().(*ApaScalingContext)
if !ok {
Expand Down
53 changes: 53 additions & 0 deletions pkg/controller/podautoscaler/scaler/apa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"testing"
"time"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"
)

Expand Down Expand Up @@ -67,3 +71,52 @@ func TestAPAScale(t *testing.T) {
t.Errorf("result should remain previous replica = %d, but got %d", readyPodCount, result.DesiredPodCount)
}
}

func TestApaUpdateContext(t *testing.T) {
pa := &autoscalingv1alpha1.PodAutoscaler{
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned
MaxReplicas: 5,
TargetValue: "1",
TargetMetric: "test.metrics",
MetricsSources: []autoscalingv1alpha1.MetricSource{
{
Endpoint: "service1.example.com",
Path: "/api/metrics/cpu",
},
},
ScalingStrategy: "APA",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"autoscaling.aibrix.ai/max-scale-up-rate": "32.1",
"autoscaling.aibrix.ai/max-scale-down-rate": "12.3",
"apa.autoscaling.aibrix.ai/up-fluctuation-tolerance": "1.2",
"apa.autoscaling.aibrix.ai/down-fluctuation-tolerance": "0.9",
},
},
}
apaSpec := NewApaScalingContext()
err := apaSpec.UpdateByPaTypes(pa)
if err != nil {
t.Errorf("Failed to update KpaScalingContext: %v", err)
}
if apaSpec.MaxScaleUpRate != 32.1 {
t.Errorf("expected MaxScaleDownRate = 32.1, got %f", apaSpec.MaxScaleDownRate)
}
if apaSpec.MaxScaleDownRate != 12.3 {
t.Errorf("expected MaxScaleDownRate = 12.3, got %f", apaSpec.MaxScaleDownRate)
}

if apaSpec.UpFluctuationTolerance != 1.2 {
t.Errorf("expected UpFluctuationTolerance = 1.2, got %f", apaSpec.UpFluctuationTolerance)
}
if apaSpec.DownFluctuationTolerance != 0.9 {
t.Errorf("expected DownFluctuationTolerance = 0.9, got %f", apaSpec.DownFluctuationTolerance)
}

}
77 changes: 69 additions & 8 deletions pkg/controller/podautoscaler/scaler/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ If the metric no longer exceeds the panic threshold, exit the panic mode.

*/

const (
KPALabelPrefix = "kpa." + scalingcontext.AutoscalingLabelPrefix
targetBurstCapacityLabel = KPALabelPrefix + "target-burst-capacity"
activationScaleLabel = KPALabelPrefix + "activation-scale"
panicThresholdLabel = KPALabelPrefix + "panic-threshold"
stableWindowLabel = KPALabelPrefix + "stable-window"
scaleDownDelayLabel = KPALabelPrefix + "scale-down-delay"
)

// KpaScalingContext defines parameters for scaling decisions.
type KpaScalingContext struct {
scalingcontext.BaseScalingContext
Expand Down Expand Up @@ -100,6 +109,57 @@ func NewKpaScalingContext() *KpaScalingContext {
}
}

func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
err := k.BaseScalingContext.UpdateByPaTypes(pa)
if err != nil {
return err
}
for key, value := range pa.Labels {
switch key {
case targetBurstCapacityLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
k.TargetBurstCapacity = v
case activationScaleLabel:
v, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return err
}
k.ActivationScale = int32(v)
case panicThresholdLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
k.PanicThreshold = v
case stableWindowLabel:
v, err := time.ParseDuration(value)
if err != nil {
return err
}
k.StableWindow = v
case scaleDownDelayLabel:
v, err := time.ParseDuration(value)
if err != nil {
return err
}
k.ScaleDownDelay = v
}
}
// unset some attribute if there are no configuration
if _, exists := pa.Labels[scaleDownDelayLabel]; !exists {
// TODO N.B. three parts of KPAScaler are stateful : panic_window, stable_window and delay windows.
// reconcile() updates KpaScalingContext periodically, but doesn't reset these three parts.
// These three parts are only initialized when controller starts.
// Therefore, apply kpa.yaml cannot modify the panic_duration, stable_duration and delay_window duration
k.ScaleDownDelay = 0
}

return nil
}

type KpaAutoscaler struct {
specMux sync.RWMutex
metricClient metrics.MetricClient
Expand All @@ -122,10 +182,14 @@ func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscal
}

// Create a new delay window based on the ScaleDownDelay specified in the spec
if spec.ScaleDownDelay <= 0 {
if spec.ScaleDownDelay < 0 {
return nil, errors.New("ScaleDownDelay must be positive")
}
delayWindow := aggregation.NewTimeWindow(spec.ScaleDownDelay, 1*time.Second)
var delayWindow *aggregation.TimeWindow
// If specify ScaleDownDelay, KpaAutoscaler.delayWindow will be initialized
if spec.ScaleDownDelay > 0 {
delayWindow = aggregation.NewTimeWindow(spec.ScaleDownDelay, 1*time.Second)
}

// As KNative stated:
// We always start in the panic mode, if the deployment is scaled up over 1 pod.
Expand All @@ -144,6 +208,7 @@ func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscal
// TODO missing MetricClient
metricsFetcher := &metrics.RestMetricsFetcher{}
metricsClient := metrics.NewKPAMetricsClient(metricsFetcher)

scalingAlgorithm := algorithm.KpaScalingAlgorithm{}

return &KpaAutoscaler{
Expand Down Expand Up @@ -253,6 +318,7 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name
// in that case).
klog.V(4).InfoS("DelayWindow details", "delayWindow", k.delayWindow.String())
if k.delayWindow != nil {
// the actual desiredPodCount will be recorded, but return the max replicas during passed delayWindow
k.delayWindow.Record(now, float64(desiredPodCount))
delayedPodCount, err := k.delayWindow.Max()
if err != nil {
Expand Down Expand Up @@ -319,15 +385,10 @@ func (k *KpaAutoscaler) UpdateSourceMetrics(ctx context.Context, metricKey metri
func (k *KpaAutoscaler) UpdateScalingContext(pa autoscalingv1alpha1.PodAutoscaler) error {
k.specMux.Lock()
defer k.specMux.Unlock()

targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64)
err := k.scalingContext.UpdateByPaTypes(&pa)
if err != nil {
klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue)
return err
}
k.scalingContext.TargetValue = targetValue
k.scalingContext.ScalingMetric = pa.Spec.TargetMetric

return nil
}

Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/podautoscaler/scaler/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"testing"
"time"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/common"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"
Expand Down Expand Up @@ -70,3 +74,62 @@ func TestKpaScale(t *testing.T) {
t.Errorf("result.DesiredPodCount = 10, got %d", result.DesiredPodCount)
}
}

func TestKpaUpdateContext(t *testing.T) {
pa := &autoscalingv1alpha1.PodAutoscaler{
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned
MaxReplicas: 5,
TargetValue: "1",
TargetMetric: "test.metrics",
MetricsSources: []autoscalingv1alpha1.MetricSource{
{
Endpoint: "service1.example.com",
Path: "/api/metrics/cpu",
},
},
ScalingStrategy: "KPA",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"autoscaling.aibrix.ai/max-scale-up-rate": "32.1",
"autoscaling.aibrix.ai/max-scale-down-rate": "12.3",
"kpa.autoscaling.aibrix.ai/target-burst-capacity": "45.6",
"kpa.autoscaling.aibrix.ai/activation-scale": "3",
"kpa.autoscaling.aibrix.ai/panic-threshold": "2.5",
"kpa.autoscaling.aibrix.ai/stable-window": "60s",
"kpa.autoscaling.aibrix.ai/scale-down-delay": "30s",
},
},
}
kpaSpec := NewKpaScalingContext()
err := kpaSpec.UpdateByPaTypes(pa)
if err != nil {
t.Errorf("Failed to update KpaScalingContext: %v", err)
}
if kpaSpec.MaxScaleUpRate != 32.1 {
t.Errorf("expected MaxScaleDownRate = 32.1, got %f", kpaSpec.MaxScaleDownRate)
}
if kpaSpec.MaxScaleDownRate != 12.3 {
t.Errorf("expected MaxScaleDownRate = 12.3, got %f", kpaSpec.MaxScaleDownRate)
}
if kpaSpec.TargetBurstCapacity != 45.6 {
t.Errorf("expected TargetBurstCapacity = 45.6, got %f", kpaSpec.TargetBurstCapacity)
}
if kpaSpec.ActivationScale != 3 {
t.Errorf("expected ActivationScale = 3, got %d", kpaSpec.ActivationScale)
}
if kpaSpec.PanicThreshold != 2.5 {
t.Errorf("expected PanicThreshold = 2.5, got %f", kpaSpec.PanicThreshold)
}
if kpaSpec.StableWindow != 60*time.Second {
t.Errorf("expected StableWindow = 60s, got %v", kpaSpec.StableWindow)
}
if kpaSpec.ScaleDownDelay != 30*time.Second {
t.Errorf("expected ScaleDownDelay = 10s, got %v", kpaSpec.ScaleDownDelay)
}
}
Loading