Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/controllers/options/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
// TaggingControllerOptions contains the inputs that can
// be used in the tagging controller
type TaggingControllerOptions struct {
Tags map[string]string
Resources []string
RateLimit float64
BurstLimit int
Tags map[string]string
Resources []string
RateLimit float64
BurstLimit int
WorkerCount int
}

// AddFlags add the additional flags for the controller
Expand All @@ -35,6 +36,8 @@ func (o *TaggingControllerOptions) AddFlags(fs *pflag.FlagSet) {
"Steady-state rate limit (per sec) at which the controller processes items in its queue. A value of zero (default) disables rate limiting.")
fs.IntVar(&o.BurstLimit, "tagging-controller-burst-limit", o.BurstLimit,
"Burst limit at which the controller processes items in its queue. A value of zero (default) disables rate limiting.")
fs.IntVar(&o.WorkerCount, "tagging-controller-concurrent-node-syncs", 1,
"The number of workers concurrently synchronizing nodes")
}

// Validate checks for errors from user input
Expand All @@ -55,6 +58,10 @@ func (o *TaggingControllerOptions) Validate() error {
return fmt.Errorf("--tagging-controller-burst-limit should not be less than zero")
}

if o.WorkerCount <= 0 {
return fmt.Errorf("--tagging-controller-concurrent-node-syncs must be a positive number")
}

for _, r := range o.Resources {
for _, resource := range SupportedResources {
if r != resource {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Controller struct {
resources []string

rateLimitEnabled bool
workerCount int
}

// NewTaggingController creates a NewTaggingController object
Expand All @@ -112,7 +113,8 @@ func NewTaggingController(
tags map[string]string,
resources []string,
rateLimit float64,
burstLimit int) (*Controller, error) {
burstLimit int,
workerCount int) (*Controller, error) {

awsCloud, ok := cloud.(*awsv1.Cloud)
if !ok {
Expand Down Expand Up @@ -149,6 +151,7 @@ func NewTaggingController(
nodesSynced: nodeInformer.Informer().HasSynced,
nodeMonitorPeriod: nodeMonitorPeriod,
rateLimitEnabled: rateLimitEnabled,
workerCount: workerCount,
}

// Use shared informer to listen to add/update/delete of nodes. Note that any nodes
Expand Down Expand Up @@ -194,7 +197,9 @@ func (tc *Controller) Run(stopCh <-chan struct{}) {
}

klog.Infof("Starting the tagging controller")
go wait.Until(tc.work, tc.nodeMonitorPeriod, stopCh)
for i := 0; i < tc.workerCount; i++ {
go wait.Until(tc.work, tc.nodeMonitorPeriod, stopCh)
}

<-stopCh
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestMultipleEnqueues(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0, 10)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/tagging/tagging_controller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (tc *ControllerWrapper) startTaggingController(ctx context.Context, initCon
tc.Options.Tags,
tc.Options.Resources,
tc.Options.RateLimit,
tc.Options.BurstLimit)
tc.Options.BurstLimit,
tc.Options.WorkerCount)

if err != nil {
klog.Warningf("failed to start tagging controller: %s", err)
Expand Down