Skip to content

Commit cad9d57

Browse files
authored
Merge pull request #3296 from sivchari/fix-goroutine-leak
fix goroutine leak
2 parents e53e9c9 + 340f74d commit cad9d57

File tree

5 files changed

+55
-34
lines changed

5 files changed

+55
-34
lines changed

kinder/pkg/cluster/manager/actions/kubeadm-init.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package actions
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"fmt"
2223
"log"
2324
"net"
@@ -226,7 +227,7 @@ func postInit(c *status.Cluster, wait time.Duration) error {
226227
// return errors.Wrap(err, "failed to add default storage class")
227228
//}
228229

229-
if err := waitNewControlPlaneNodeReady(c, cp1, wait); err != nil {
230+
if err := waitNewControlPlaneNodeReady(context.Background(), c, cp1, wait); err != nil {
230231
return err
231232
}
232233

kinder/pkg/cluster/manager/actions/kubeadm-join.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package actions
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"time"
2223

@@ -83,7 +84,7 @@ func joinControlPlanes(c *status.Cluster, usePhases bool, copyCertsMode CopyCert
8384
return err
8485
}
8586

86-
if err := waitNewControlPlaneNodeReady(c, cp2, wait); err != nil {
87+
if err := waitNewControlPlaneNodeReady(context.Background(), c, cp2, wait); err != nil {
8788
return err
8889
}
8990
}
@@ -189,7 +190,7 @@ func joinWorkers(c *status.Cluster, usePhases bool, discoveryMode DiscoveryMode,
189190
return err
190191
}
191192

192-
if err := waitNewWorkerNodeReady(c, w, wait); err != nil {
193+
if err := waitNewWorkerNodeReady(context.Background(), c, w, wait); err != nil {
193194
return err
194195
}
195196
}

kinder/pkg/cluster/manager/actions/kubeadm-upgrade.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package actions
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"path/filepath"
2223
"time"
@@ -220,7 +221,7 @@ func kubeadmUpgradeApply(c *status.Cluster, cp1 *status.Node, configVersion stri
220221
return err
221222
}
222223

223-
if err := waitControlPlaneUpgraded(c, cp1, upgradeVersion, wait); err != nil {
224+
if err := waitControlPlaneUpgraded(context.Background(), c, cp1, upgradeVersion, wait); err != nil {
224225
return err
225226
}
226227

@@ -231,7 +232,7 @@ func kubeadmUpgradeNode(c *status.Cluster, n *status.Node, configVersion string,
231232
// waitKubeletHasRBAC waits for the kubelet to have access to the expected config map
232233
// please note that this is a temporary workaround for a problem we are observing on upgrades while
233234
// executing node upgrades immediately after control-plane upgrade.
234-
if err := waitKubeletHasRBAC(c, n, upgradeVersion, wait); err != nil {
235+
if err := waitKubeletHasRBAC(context.Background(), c, n, upgradeVersion, wait); err != nil {
235236
return err
236237
}
237238

@@ -255,7 +256,7 @@ func kubeadmUpgradeNode(c *status.Cluster, n *status.Node, configVersion string,
255256
}
256257

257258
if n.IsControlPlane() {
258-
if err := waitControlPlaneUpgraded(c, n, upgradeVersion, wait); err != nil {
259+
if err := waitControlPlaneUpgraded(context.Background(), c, n, upgradeVersion, wait); err != nil {
259260
return err
260261
}
261262
}
@@ -302,7 +303,7 @@ func upgradeKubeletKubectl(c *status.Cluster, n *status.Node, upgradeVersion *ve
302303
return err
303304
}
304305

305-
if err := waitKubeletUpgraded(c, n, upgradeVersion, wait); err != nil {
306+
if err := waitKubeletUpgraded(context.Background(), c, n, upgradeVersion, wait); err != nil {
306307
return err
307308
}
308309

kinder/pkg/cluster/manager/actions/smoke-test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package actions
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"strings"
2223
"time"
@@ -46,7 +47,7 @@ func SmokeTest(c *status.Cluster, wait time.Duration) error {
4647
return err
4748
}
4849

49-
if err := waitForPodsRunning(c, cp1, wait, "nginx", 1); err != nil {
50+
if err := waitForPodsRunning(context.Background(), c, cp1, wait, "nginx", 1); err != nil {
5051
return err
5152
}
5253

@@ -67,7 +68,7 @@ func SmokeTest(c *status.Cluster, wait time.Duration) error {
6768
}
6869

6970
for _, n := range c.K8sNodes() {
70-
err = waitForNodePort(c, n, 30*time.Second, nodePort)
71+
err = waitForNodePort(context.Background(), c, n, 30*time.Second, nodePort)
7172
if err != nil {
7273
return err
7374
}

kinder/pkg/cluster/manager/actions/waiter.go

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package actions
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"math/rand"
2223
"regexp"
@@ -30,9 +31,9 @@ import (
3031
)
3132

3233
// waitNewControlPlaneNodeReady waits for a new control plane node reaching the target state after init/join
33-
func waitNewControlPlaneNodeReady(c *status.Cluster, n *status.Node, wait time.Duration) error {
34+
func waitNewControlPlaneNodeReady(ctx context.Context, c *status.Cluster, n *status.Node, wait time.Duration) error {
3435
n.Infof("waiting for Node and control-plane Pods to become Ready (timeout %s)", wait)
35-
if pass := waitFor(c, n, wait,
36+
if pass := waitFor(ctx, c, n, wait,
3637
nodeIsReady,
3738
staticPodIsReady("kube-apiserver"),
3839
staticPodIsReady("kube-controller-manager"),
@@ -44,8 +45,8 @@ func waitNewControlPlaneNodeReady(c *status.Cluster, n *status.Node, wait time.D
4445
return nil
4546
}
4647

47-
func waitForPodsRunning(c *status.Cluster, n *status.Node, wait time.Duration, label string, replicas int) error {
48-
if pass := waitFor(c, n, wait,
48+
func waitForPodsRunning(ctx context.Context, c *status.Cluster, n *status.Node, wait time.Duration, label string, replicas int) error {
49+
if pass := waitFor(ctx, c, n, wait,
4950
podsAreRunning(n, label, replicas),
5051
); !pass {
5152
return errors.New("timeout: Node and control-plane did not reach target state")
@@ -55,9 +56,9 @@ func waitForPodsRunning(c *status.Cluster, n *status.Node, wait time.Duration, l
5556
}
5657

5758
// waitForNodePort waits for a nodePort to become ready
58-
func waitForNodePort(c *status.Cluster, n *status.Node, wait time.Duration, nodePort string) error {
59+
func waitForNodePort(ctx context.Context, c *status.Cluster, n *status.Node, wait time.Duration, nodePort string) error {
5960
n.Infof("waiting for NodePort %q to become ready (timeout %s)", nodePort, wait)
60-
if pass := waitFor(c, n, wait,
61+
if pass := waitFor(ctx, c, n, wait,
6162
nodePortIsReady(n, nodePort),
6263
); !pass {
6364
return errors.New("timeout: NodePort not ready")
@@ -67,9 +68,9 @@ func waitForNodePort(c *status.Cluster, n *status.Node, wait time.Duration, node
6768
}
6869

6970
// waitNewWorkerNodeReady waits for a new control plane node reaching the target state after join
70-
func waitNewWorkerNodeReady(c *status.Cluster, n *status.Node, wait time.Duration) error {
71+
func waitNewWorkerNodeReady(ctx context.Context, c *status.Cluster, n *status.Node, wait time.Duration) error {
7172
n.Infof("waiting for Node to become Ready (timeout %s)", wait)
72-
if pass := waitFor(c, n, wait,
73+
if pass := waitFor(ctx, c, n, wait,
7374
nodeIsReady,
7475
); !pass {
7576
return errors.New("timeout: Node did not reach target state")
@@ -79,11 +80,11 @@ func waitNewWorkerNodeReady(c *status.Cluster, n *status.Node, wait time.Duratio
7980
}
8081

8182
// waitControlPlaneUpgraded waits for a control plane node reaching the target state after upgrade
82-
func waitControlPlaneUpgraded(c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
83+
func waitControlPlaneUpgraded(ctx context.Context, c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
8384
version := kubernetesVersionToImageTag(upgradeVersion.String())
8485

8586
n.Infof("waiting for control-plane Pods to restart with the new version (timeout %s)", wait)
86-
if pass := waitFor(c, n, wait,
87+
if pass := waitFor(ctx, c, n, wait,
8788
staticPodHasVersion("kube-apiserver", version),
8889
staticPodHasVersion("kube-controller-manager", version),
8990
staticPodHasVersion("kube-scheduler", version),
@@ -95,11 +96,11 @@ func waitControlPlaneUpgraded(c *status.Cluster, n *status.Node, upgradeVersion
9596
}
9697

9798
// waitKubeletUpgraded waits for a node reaching the target state after upgrade
98-
func waitKubeletUpgraded(c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
99+
func waitKubeletUpgraded(ctx context.Context, c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
99100
version := upgradeVersion.String()
100101

101102
n.Infof("waiting for node to restart with the new version (timeout %s)", wait)
102-
if pass := waitFor(c, n, wait,
103+
if pass := waitFor(ctx, c, n, wait,
103104
nodeHasKubernetesVersion(version),
104105
); !pass {
105106
return errors.New("timeout: node did not reach target state")
@@ -111,9 +112,9 @@ func waitKubeletUpgraded(c *status.Cluster, n *status.Node, upgradeVersion *K8sV
111112
// waitKubeletHasRBAC waits for the kubelet to have access to the expected config map
112113
// please note that this is a temporary workaround for a problem we are observing on upgrades while
113114
// executing node upgrades immediately after control-plane upgrade.
114-
func waitKubeletHasRBAC(c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
115+
func waitKubeletHasRBAC(ctx context.Context, c *status.Cluster, n *status.Node, upgradeVersion *K8sVersion.Version, wait time.Duration) error {
115116
n.Infof("waiting for kubelet RBAC validation - workaround (timeout %s)", wait)
116-
if pass := waitFor(c, n, wait,
117+
if pass := waitFor(ctx, c, n, wait,
117118
kubeletHasRBAC(upgradeVersion.Major(), upgradeVersion.Minor()),
118119
); !pass {
119120
return errors.New("timeout: Node did not reach target state")
@@ -127,18 +128,18 @@ type try func(*status.Cluster, *status.Node) bool
127128

128129
// waitFor implements the waiter core logic that is responsible for testing all the given contitions
129130
// until are satisfied or a timeout are reached
130-
func waitFor(c *status.Cluster, n *status.Node, timeout time.Duration, conditions ...try) bool {
131+
func waitFor(ctx context.Context, c *status.Cluster, n *status.Node, timeout time.Duration, conditions ...try) bool {
131132
// if timeout is 0 or no conditions are defined, exit fast
132133
if timeout == time.Duration(0) {
133134
fmt.Println("Timeout set 0, skipping wait")
134135
return true
135136
}
136137

137-
// sets the timeout timer
138-
timer := time.NewTimer(timeout)
138+
ctx, cancel := context.WithTimeout(ctx, timeout)
139+
defer cancel()
139140

140141
// runs all the conditions in parallel
141-
pass := make(chan bool)
142+
pass := make(chan struct{}, len(conditions))
142143
for _, wc := range conditions {
143144
// clone the condition func to make the closure point to right value
144145
// even after the for loop moves to the next condition
@@ -147,15 +148,33 @@ func waitFor(c *status.Cluster, n *status.Node, timeout time.Duration, condition
147148
// run the condition in a go routine until it pass
148149
go func() {
149150
// creates an arbitrary skew before starting a wait loop
150-
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
151+
timer := time.NewTimer(time.Duration(rand.Intn(500)) * time.Millisecond)
152+
defer timer.Stop()
153+
154+
select {
155+
case <-ctx.Done():
156+
return
157+
case <-timer.C:
158+
}
151159

152160
for {
161+
select {
162+
case <-ctx.Done():
163+
return
164+
default:
165+
}
166+
153167
if x(c, n) {
154-
<-pass
168+
pass <- struct{}{}
155169
break
156170
}
157171
// add a little delay + jitter before retry
158-
time.Sleep(1*time.Second + time.Duration(rand.Intn(500))*time.Millisecond)
172+
timer.Reset(1*time.Second + time.Duration(rand.Intn(500))*time.Millisecond)
173+
select {
174+
case <-ctx.Done():
175+
return
176+
case <-timer.C:
177+
}
159178
}
160179
}()
161180
}
@@ -164,14 +183,12 @@ func waitFor(c *status.Cluster, n *status.Node, timeout time.Duration, condition
164183
passed := 0
165184
for {
166185
select {
167-
case pass <- true:
186+
case <-pass:
168187
passed++
169188
if passed == len(conditions) {
170189
return true
171190
}
172-
case <-timer.C:
173-
// close the channel if timeout occurs, this will release all the blocked receives
174-
close(pass)
191+
case <-ctx.Done():
175192
return false
176193
}
177194
}

0 commit comments

Comments
 (0)