@@ -14,6 +14,7 @@ limitations under the License.
1414package tagging
1515
1616import (
17+ "context"
1718 "crypto/md5"
1819 "fmt"
1920 "sort"
@@ -185,7 +186,7 @@ func NewTaggingController(
185186
186187// Run will start the controller to tag resources attached to the cluster
187188// and untag resources detached from the cluster.
188- func (tc * Controller ) Run (stopCh <- chan struct {}) {
189+ func (tc * Controller ) Run (ctx context. Context , stopCh <- chan struct {}) {
189190 defer utilruntime .HandleCrash ()
190191 defer tc .workqueue .ShutDown ()
191192
@@ -198,22 +199,22 @@ func (tc *Controller) Run(stopCh <-chan struct{}) {
198199
199200 klog .Infof ("Starting the tagging controller" )
200201 for i := 0 ; i < tc .workerCount ; i ++ {
201- go wait .Until (tc .work , tc .nodeMonitorPeriod , stopCh )
202+ go wait .Until (func () { tc .work ( ctx ) } , tc .nodeMonitorPeriod , stopCh )
202203 }
203204
204205 <- stopCh
205206}
206207
207208// work is a long-running function that continuously
208209// call process() for each message on the workqueue
209- func (tc * Controller ) work () {
210- for tc .process () {
210+ func (tc * Controller ) work (ctx context. Context ) {
211+ for tc .process (ctx ) {
211212 }
212213}
213214
214215// process reads each message in the queue and performs either
215216// tag or untag function on the Node object
216- func (tc * Controller ) process () bool {
217+ func (tc * Controller ) process (ctx context. Context ) bool {
217218 obj , shutdown := tc .workqueue .Get ()
218219 if shutdown {
219220 return false
@@ -247,12 +248,12 @@ func (tc *Controller) process() bool {
247248 return nil
248249 }
249250 if workItem .action == addTag {
250- err = tc .tagNodesResources (& taggingControllerNode {
251+ err = tc .tagNodesResources (ctx , & taggingControllerNode {
251252 name : workItem .name ,
252253 providerID : workItem .providerID ,
253254 })
254255 } else {
255- err = tc .untagNodeResources (& taggingControllerNode {
256+ err = tc .untagNodeResources (ctx , & taggingControllerNode {
256257 name : workItem .name ,
257258 providerID : workItem .providerID ,
258259 })
@@ -287,7 +288,7 @@ func (tc *Controller) process() bool {
287288
288289// tagNodesResources tag node resources
289290// If we want to tag more resources, modify this function appropriately
290- func (tc * Controller ) tagNodesResources (node * taggingControllerNode ) error {
291+ func (tc * Controller ) tagNodesResources (ctx context. Context , node * taggingControllerNode ) error {
291292 for _ , resource := range tc .resources {
292293 switch resource {
293294 case opt .Instance :
@@ -299,7 +300,7 @@ func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
299300 }
300301 return err
301302 }
302- err = tc .tagEc2Instance (v1node )
303+ err = tc .tagEc2Instance (ctx , v1node )
303304 if err != nil {
304305 return err
305306 }
@@ -311,15 +312,15 @@ func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
311312
312313// tagEc2Instances applies the provided tags to each EC2 instance in
313314// the cluster.
314- func (tc * Controller ) tagEc2Instance (node * v1.Node ) error {
315+ func (tc * Controller ) tagEc2Instance (ctx context. Context , node * v1.Node ) error {
315316 if ! tc .isTaggingRequired (node ) {
316317 klog .Infof ("Skip tagging node %s since it was already tagged earlier." , node .GetName ())
317318 return nil
318319 }
319320
320321 instanceID , _ := awsv1 .KubernetesInstanceID (node .Spec .ProviderID ).MapToAWSInstanceID ()
321322
322- err := tc .cloud .TagResource (string (instanceID ), tc .tags )
323+ err := tc .cloud .TagResource (ctx , string (instanceID ), tc .tags )
323324
324325 if err != nil {
325326 if awsv1 .IsAWSErrorInstanceNotFound (err ) {
@@ -352,11 +353,11 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {
352353
353354// untagNodeResources untag node resources
354355// If we want to untag more resources, modify this function appropriately
355- func (tc * Controller ) untagNodeResources (node * taggingControllerNode ) error {
356+ func (tc * Controller ) untagNodeResources (ctx context. Context , node * taggingControllerNode ) error {
356357 for _ , resource := range tc .resources {
357358 switch resource {
358359 case opt .Instance :
359- err := tc .untagEc2Instance (node )
360+ err := tc .untagEc2Instance (ctx , node )
360361 if err != nil {
361362 return err
362363 }
@@ -368,10 +369,10 @@ func (tc *Controller) untagNodeResources(node *taggingControllerNode) error {
368369
369370// untagEc2Instances deletes the provided tags to each EC2 instances in
370371// the cluster.
371- func (tc * Controller ) untagEc2Instance (node * taggingControllerNode ) error {
372+ func (tc * Controller ) untagEc2Instance (ctx context. Context , node * taggingControllerNode ) error {
372373 instanceID , _ := awsv1 .KubernetesInstanceID (node .providerID ).MapToAWSInstanceID ()
373374
374- err := tc .cloud .UntagResource (string (instanceID ), tc .tags )
375+ err := tc .cloud .UntagResource (ctx , string (instanceID ), tc .tags )
375376
376377 if err != nil {
377378 klog .Errorf ("Error in untagging EC2 instance %s for node %s, error: %v" , instanceID , node .name , err )
0 commit comments