Skip to content

Commit 4a125c2

Browse files
committed
Adjust autoscaler interface
1 parent ee412be commit 4a125c2

File tree

5 files changed

+117
-118
lines changed

5 files changed

+117
-118
lines changed

pkg/controller/podautoscaler/podautoscaler_controller.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"fmt"
2222
"time"
2323

24+
"github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler"
25+
2426
autoscalingv2 "k8s.io/api/autoscaling/v2"
2527
apiequality "k8s.io/apimachinery/pkg/api/equality"
2628
apimeta "k8s.io/apimachinery/pkg/api/meta"
@@ -90,6 +92,7 @@ type PodAutoscalerReconciler struct {
9092
Scheme *runtime.Scheme
9193
EventRecorder record.EventRecorder
9294
Mapper apimeta.RESTMapper
95+
Autoscaler scaler.Scaler
9396
}
9497

9598
//+kubebuilder:rbac:groups=autoscaling.aibrix.ai,resources=podautoscalers,verbs=get;list;watch;create;update;patch;delete
@@ -189,7 +192,6 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
189192
Group: targetGV.Group,
190193
Kind: pa.Spec.ScaleTargetRef.Kind,
191194
}
192-
193195
mappings, err := r.Mapper.RESTMappings(targetGK)
194196
if err != nil {
195197
r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedGetScale", err.Error())
@@ -201,7 +203,7 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
201203
}
202204

203205
// TODO: retrieval targetGR for future scale update
204-
scale, _, err := r.scaleForResourceMappings(ctx, pa.Namespace, pa.Spec.ScaleTargetRef.Name, mappings)
206+
scale, targetGR, err := r.scaleForResourceMappings(ctx, pa.Namespace, pa.Spec.ScaleTargetRef.Name, mappings)
205207
if err != nil {
206208
r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedGetScale", err.Error())
207209
setCondition(&pa, "AbleToScale", metav1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
@@ -271,24 +273,27 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
271273

272274
if rescale {
273275
scale.Spec.Replicas = desiredReplicas
274-
// TODO: invoke scale interface to scale the scaleTarget
275-
// no need to use targetGR?
276276
r.EventRecorder.Eventf(&pa, corev1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
277277
setCondition(&pa, "AbleToScale", metav1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
278278
r.setCurrentReplicasAndMetricsInStatus(&pa, currentReplicas)
279279
if err := r.updateStatusIfNeeded(ctx, paStatusOriginal, &pa); err != nil {
280280
utilruntime.HandleError(err)
281281
}
282-
if err := r.Client.SubResource("scale").Update(ctx, scale); err != nil {
282+
283+
if err := r.updateScale(ctx, pa.Namespace, targetGR, scale); err != nil {
283284
return ctrl.Result{}, fmt.Errorf("failed to rescale %s: %v", scaleReference, err)
284285
}
285286

287+
// which way to go?. not sure the best practice in controller-runtime
288+
//if err := r.Client.SubResource("scale").Update(ctx, scale); err != nil {
289+
// return ctrl.Result{}, fmt.Errorf("failed to rescale %s: %v", scaleReference, err)
290+
//}
291+
286292
logger.Info("Successfully rescaled",
287293
//"PodAutoscaler", klog.KObj(pa),
288294
"currentReplicas", currentReplicas,
289295
"desiredReplicas", desiredReplicas,
290296
"reason", rescaleReason)
291-
292297
}
293298

294299
if err := r.updateStatusIfNeeded(ctx, paStatusOriginal, &pa); err != nil {
@@ -339,9 +344,9 @@ func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context,
339344
return nil, schema.GroupResource{}, firstErr
340345
}
341346

342-
func updateScale(ctx context.Context, c client.Client, namespace string, targetGR schema.GroupResource, scale *autoscalingv1.Scale) error {
347+
func (r *PodAutoscalerReconciler) updateScale(ctx context.Context, namespace string, targetGR schema.GroupResource, scale *autoscalingv1.Scale) error {
343348
// Get GVK
344-
gvk, err := apiutil.GVKForObject(scale, c.Scheme())
349+
gvk, err := apiutil.GVKForObject(scale, r.Client.Scheme())
345350
if err != nil {
346351
return err
347352
}
@@ -353,8 +358,8 @@ func updateScale(ctx context.Context, c client.Client, namespace string, targetG
353358
scaleObj.SetName(scale.Name)
354359

355360
// Update scale object
356-
// TODO: change to kind name later.
357-
err = c.Patch(ctx, scale, client.Apply, client.FieldOwner("operator-name"))
361+
//err = r.Client.Patch(ctx, scale, client.Apply, client.FieldOwner("operator-name"))
362+
err = r.Client.Patch(ctx, scale, client.Apply)
358363
if err != nil {
359364
return err
360365
}
@@ -416,9 +421,11 @@ func (r *PodAutoscalerReconciler) updateStatus(ctx context.Context, pa *autoscal
416421
// when some metrics still work and HPA should perform scaling based on them.
417422
// If PodAutoscaler cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
418423
func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *autoscalingv1.Scale) (replicas int32, metrics string, timestamp time.Time, err error) {
419-
panic("not implemented")
420-
}
424+
currentTimestamp := time.Now()
425+
scaleResult := r.Autoscaler.Scale(0, 0, currentTimestamp)
426+
if scaleResult.ScaleValid {
427+
return scaleResult.DesiredPodCount, "", currentTimestamp, nil
428+
}
421429

422-
// TODO: define the condition type to reconcile.
423-
// PodAutoscalerConditionType are the valid conditions of a PodAutoscaler.
424-
type PodAutoscalerConditionType string
430+
return 0, "", currentTimestamp, fmt.Errorf("can not calculate metrics for scale %s", scale.Name)
431+
}

pkg/controller/podautoscaler/scaler/interfaces.go

Lines changed: 10 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package scaler
1919
import (
2020
"sync"
2121
"time"
22+
23+
"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
2225
)
2326

2427
/**
@@ -35,17 +38,17 @@ Our implementation specifically mimics and adapts the autoscaling functionality
3538

3639
// Autoscaler represents an instance of the autoscaling engine.
3740
// It encapsulates all the necessary data and state needed for scaling decisions.
38-
// Refer to: KpaScaler
41+
// Refer to: KpaAutoscaler
3942
type Autoscaler struct {
4043
// specMux guards the current DeciderSpec.
41-
specMux sync.RWMutex
42-
podCounter int
43-
deciderSpec *DeciderSpec
44-
Status DeciderStatus
44+
specMux sync.RWMutex
45+
metricsClient metrics.MetricsClient
46+
resourceClient client.Client
47+
scaler Scaler
4548
}
4649

4750
// Scaler is an interface that defines the scaling operations.
48-
// Any autoscaler implementation, such as KpaScaler (Kubernetes Pod Autoscaler),
51+
// Any autoscaler implementation, such as KpaAutoscaler (Kubernetes Pod Autoscaler),
4952
// needs to implement this interface to respond to scale events.
5053
type Scaler interface {
5154
// Scale calculates the necessary scaling action based on the observed metrics
@@ -59,62 +62,10 @@ type Scaler interface {
5962
// Returns:
6063
// ScaleResult which contains the recommended number of pods to scale up or down to.
6164
//
62-
// Refer to: KpaScaler.Scale Implementation
65+
// Refer to: KpaAutoscaler.Scale Implementation
6366
Scale(observedStableValue float64, observedPanicValue float64, now time.Time) ScaleResult
6467
}
6568

66-
// DeciderSpec defines parameters for scaling decisions.
67-
type DeciderSpec struct {
68-
// Maximum rate at which to scale up
69-
MaxScaleUpRate float64
70-
// Maximum rate at which to scale down
71-
MaxScaleDownRate float64
72-
// The metric used for scaling, i.e. CPU, Memory, QPS.
73-
ScalingMetric string
74-
// The value of scaling metric per pod that we target to maintain.
75-
TargetValue float64
76-
// The total value of scaling metric that a pod can maintain.
77-
TotalValue float64
78-
// The burst capacity that user wants to maintain without queuing at the POD level.
79-
// Note, that queueing still might happen due to the non-ideal load balancing.
80-
TargetBurstCapacity float64
81-
// ActivationScale is the minimum, non-zero value that a service should scale to.
82-
// For example, if ActivationScale = 2, when a service scaled from zero it would
83-
// scale up two replicas in this case. In essence, this allows one to set both a
84-
// min-scale value while also preserving the ability to scale to zero.
85-
// ActivationScale must be >= 2.
86-
ActivationScale int32
87-
88-
// TODO: Note that the following attributes are specific to Knative; but we retain them here temporarily.
89-
// PanicThreshold is the threshold at which panic mode is entered. It represents
90-
// a factor of the currently observed load over the panic window over the ready
91-
// pods. I.e. if this is 2, panic mode will be entered if the observed metric
92-
// is twice as high as the current population can handle.
93-
PanicThreshold float64
94-
// StableWindow is needed to determine when to exit panic mode.
95-
StableWindow time.Duration
96-
// ScaleDownDelay is the time that must pass at reduced concurrency before a
97-
// scale-down decision is applied.
98-
ScaleDownDelay time.Duration
99-
}
100-
101-
// DeciderStatus is the current scale recommendation.
102-
type DeciderStatus struct {
103-
// DesiredScale is the target number of instances that autoscaler
104-
// this revision needs.
105-
DesiredScale int32
106-
107-
// TODO: ExcessBurstCapacity might be a general attribute since it describes
108-
// how much capacity users want to keep for preparing for burst traffic.
109-
110-
// ExcessBurstCapacity is the difference between spare capacity
111-
// (how much more load the pods in the revision deployment can take before being
112-
// overloaded) and the configured target burst capacity.
113-
// If this number is negative: Activator will be threaded in
114-
// the request path by the PodAutoscaler controller.
115-
ExcessBurstCapacity int32
116-
}
117-
11869
// ScaleResult contains the results of a scaling decision.
11970
type ScaleResult struct {
12071
// DesiredPodCount is the number of pods Autoscaler suggests for the revision.

pkg/controller/podautoscaler/scaler/kpa.go

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
)
2727

2828
/**
29-
This implementation of the algorithm is based on both the Knative KpaScaler code and its documentation.
29+
This implementation of the algorithm is based on both the Knative KpaAutoscaler code and its documentation.
3030
31-
According to Knative documentation, the KpaScaler Scale policy includes both a stable mode and a panic mode.
32-
If the metric usage does not exceed the panic threshold, KpaScaler tries to align the per-pod metric usage with the stable target value.
33-
If metric usage exceeds the panic target during the panic window, KpaScaler enters panic mode and tries to maintain the per-pod metric usage at the panic target.
31+
According to Knative documentation, the KpaAutoscaler Scale policy includes both a stable mode and a panic mode.
32+
If the metric usage does not exceed the panic threshold, KpaAutoscaler tries to align the per-pod metric usage with the stable target value.
33+
If metric usage exceeds the panic target during the panic window, KpaAutoscaler enters panic mode and tries to maintain the per-pod metric usage at the panic target.
3434
If the metric no longer exceeds the panic threshold, exit the panic mode.
3535
3636
|
@@ -47,45 +47,100 @@ If the metric no longer exceeds the panic threshold, exit the panic mode.
4747
4848
*/
4949

50-
type KpaScaler struct {
51-
scaler *Autoscaler
50+
// DeciderSpec defines parameters for scaling decisions.
51+
type DeciderSpec struct {
52+
// Maximum rate at which to scale up
53+
MaxScaleUpRate float64
54+
// Maximum rate at which to scale down
55+
MaxScaleDownRate float64
56+
// The metric used for scaling, i.e. CPU, Memory, QPS.
57+
ScalingMetric string
58+
// The value of scaling metric per pod that we target to maintain.
59+
TargetValue float64
60+
// The total value of scaling metric that a pod can maintain.
61+
TotalValue float64
62+
// The burst capacity that user wants to maintain without queuing at the POD level.
63+
// Note, that queueing still might happen due to the non-ideal load balancing.
64+
TargetBurstCapacity float64
65+
// ActivationScale is the minimum, non-zero value that a service should scale to.
66+
// For example, if ActivationScale = 2, when a service scaled from zero it would
67+
// scale up two replicas in this case. In essence, this allows one to set both a
68+
// min-scale value while also preserving the ability to scale to zero.
69+
// ActivationScale must be >= 2.
70+
ActivationScale int32
71+
72+
// TODO: Note that the following attributes are specific to Knative; but we retain them here temporarily.
73+
// PanicThreshold is the threshold at which panic mode is entered. It represents
74+
// a factor of the currently observed load over the panic window over the ready
75+
// pods. I.e. if this is 2, panic mode will be entered if the observed metric
76+
// is twice as high as the current population can handle.
77+
PanicThreshold float64
78+
// StableWindow is needed to determine when to exit panic mode.
79+
StableWindow time.Duration
80+
// ScaleDownDelay is the time that must pass at reduced concurrency before a
81+
// scale-down decision is applied.
82+
ScaleDownDelay time.Duration
83+
}
84+
85+
// DeciderStatus is the current scale recommendation.
86+
type DeciderStatus struct {
87+
// DesiredScale is the target number of instances that autoscaler
88+
// this revision needs.
89+
DesiredScale int32
90+
91+
// TODO: ExcessBurstCapacity might be a general attribute since it describes
92+
// how much capacity users want to keep for preparing for burst traffic.
93+
94+
// ExcessBurstCapacity is the difference between spare capacity
95+
// (how much more load the pods in the revision deployment can take before being
96+
// overloaded) and the configured target burst capacity.
97+
// If this number is negative: Activator will be threaded in
98+
// the request path by the PodAutoscaler controller.
99+
ExcessBurstCapacity int32
100+
}
101+
102+
type KpaAutoscaler struct {
103+
*Autoscaler
52104
panicTime time.Time
53105
maxPanicPods int32
54106
delayWindow *aggregation.TimeWindow
107+
podCounter int
108+
deciderSpec *DeciderSpec
109+
Status DeciderStatus
55110
}
56111

57-
func NewKpaScaler(readyPodsCount int, spec *DeciderSpec, panicTime time.Time,
58-
maxPanicPods int32, delayWindow *aggregation.TimeWindow) (*KpaScaler, error) {
112+
var _ Scaler = (*KpaAutoscaler)(nil)
113+
114+
func NewKpaAutoscaler(readyPodsCount int, spec *DeciderSpec, panicTime time.Time,
115+
maxPanicPods int32, delayWindow *aggregation.TimeWindow) (*KpaAutoscaler, error) {
59116
if spec == nil {
60117
return nil, errors.New("spec cannot be nil")
61118
}
62119
if delayWindow == nil {
63120
return nil, errors.New("delayWindow cannot be nil")
64121
}
65-
scaler := &Autoscaler{
66-
podCounter: readyPodsCount,
67-
deciderSpec: spec,
68-
}
69-
return &KpaScaler{
70-
scaler: scaler,
122+
autoscaler := &Autoscaler{}
123+
return &KpaAutoscaler{
124+
Autoscaler: autoscaler,
125+
podCounter: readyPodsCount,
71126
panicTime: panicTime,
72127
maxPanicPods: maxPanicPods,
73128
delayWindow: delayWindow,
129+
deciderSpec: spec,
74130
}, nil
75131
}
76132

77-
// Scale implements Scaler interface in KpaScaler.
78-
func (k *KpaScaler) Scale(observedStableValue float64, observedPanicValue float64, now time.Time) ScaleResult {
133+
// Scale implements Scaler interface in KpaAutoscaler.
134+
func (k *KpaAutoscaler) Scale(observedStableValue float64, observedPanicValue float64, now time.Time) ScaleResult {
79135
/**
80136
`observedStableValue` and `observedPanicValue` are calculated using different window sizes in the `MetricClient`.
81137
For reference, see the KNative implementation at `pkg/autoscaler/metrics/collector.go:185`.
82138
*/
83-
a := k.scaler
84-
a.specMux.RLock()
85-
spec := a.deciderSpec
86-
a.specMux.RUnlock()
139+
k.specMux.RLock()
140+
spec := k.deciderSpec
141+
k.specMux.RUnlock()
87142

88-
originalReadyPodsCount := a.podCounter
143+
originalReadyPodsCount := k.podCounter
89144
// Use 1 if there are zero current pods.
90145
readyPodsCount := math.Max(1, float64(originalReadyPodsCount))
91146

@@ -110,12 +165,12 @@ func (k *KpaScaler) Scale(observedStableValue float64, observedPanicValue float6
110165
desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))
111166

112167
// If ActivationScale > 1, then adjust the desired pod counts
113-
if a.deciderSpec.ActivationScale > 1 {
114-
if dspc > 0 && a.deciderSpec.ActivationScale > desiredStablePodCount {
115-
desiredStablePodCount = a.deciderSpec.ActivationScale
168+
if k.deciderSpec.ActivationScale > 1 {
169+
if dspc > 0 && k.deciderSpec.ActivationScale > desiredStablePodCount {
170+
desiredStablePodCount = k.deciderSpec.ActivationScale
116171
}
117-
if dppc > 0 && a.deciderSpec.ActivationScale > desiredPanicPodCount {
118-
desiredPanicPodCount = a.deciderSpec.ActivationScale
172+
if dppc > 0 && k.deciderSpec.ActivationScale > desiredPanicPodCount {
173+
desiredPanicPodCount = k.deciderSpec.ActivationScale
119174
}
120175
}
121176

pkg/controller/podautoscaler/scaler/kpa_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
func TestScale(t *testing.T) {
28-
kpaScaler, err := NewKpaScaler(5,
28+
kpaScaler, err := NewKpaAutoscaler(5,
2929
&DeciderSpec{
3030
MaxScaleUpRate: 1.5,
3131
MaxScaleDownRate: 0.75,
@@ -39,7 +39,7 @@ func TestScale(t *testing.T) {
3939
time.Time{}, 10, aggregation.NewTimeWindow(30*time.Second, 1*time.Second),
4040
)
4141
if err != nil {
42-
t.Errorf("Failed to create KpaScaler: %v", err)
42+
t.Errorf("Failed to create KpaAutoscaler: %v", err)
4343
}
4444

4545
observedStableValue := 120.0

pkg/controller/podautoscaler/scaler/scaler.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,9 @@ import (
1313
"sigs.k8s.io/controller-runtime/pkg/client"
1414
)
1515

16-
// ReplicasScaler bundles all needed information to calculate the target amount of replicas
17-
type ReplicasScaler struct {
18-
metricsClient metrics.MetricsClient
19-
resourceClient client.Client
20-
// put necessary configuration information there.
21-
}
22-
23-
func newReplicasScaler(metricsClient metrics.MetricsClient, client client.Client) *ReplicasScaler {
24-
return &ReplicasScaler{
25-
metricsClient: metricsClient,
26-
resourceClient: client,
27-
}
28-
}
29-
30-
func (c *ReplicasScaler) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) {
16+
func (a *Autoscaler) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) {
3117
podList := &v1.PodList{}
32-
if err := c.resourceClient.List(context.Background(), podList,
18+
if err := a.resourceClient.List(context.Background(), podList,
3319
&client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
3420
return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
3521
}

0 commit comments

Comments
 (0)