Skip to content

Commit 81bdffc

Browse files
tnozickabertinatto
authored andcommitted
UPSTREAM: <carry>: Release lock on KCM and KS termination
UPSTREAM: <carry>: Force releasing the lock on exit for KS squash with UPSTREAM: <carry>: Release lock on KCM and KS termination OpenShift-Rebase-Source: fc91252 UPSTREAM: <carry>: Release lock on KCM and KS termination
1 parent 4905bdc commit 81bdffc

File tree

4 files changed

+104
-12
lines changed

4 files changed

+104
-12
lines changed

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"k8s.io/apimachinery/pkg/util/sets"
3939
"k8s.io/apimachinery/pkg/util/uuid"
4040
"k8s.io/apimachinery/pkg/util/wait"
41+
"k8s.io/apiserver/pkg/server"
4142
"k8s.io/apiserver/pkg/server/healthz"
4243
"k8s.io/apiserver/pkg/server/mux"
4344
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -158,7 +159,8 @@ controller, and serviceaccounts controller.`,
158159
fg.(featuregate.MutableFeatureGate).AddMetrics()
159160
// add component version metrics
160161
s.ComponentGlobalsRegistry.AddMetrics()
161-
return Run(ctx, c.Complete())
162+
stopCh := server.SetupSignalHandler()
163+
return Run(context.Background(), c.Complete(), stopCh)
162164
},
163165
Args: func(cmd *cobra.Command, args []string) error {
164166
for _, arg := range args {
@@ -195,9 +197,9 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
195197
}
196198

197199
// Run runs the KubeControllerManagerOptions.
198-
func Run(ctx context.Context, c *config.CompletedConfig) error {
200+
func Run(ctx context.Context, c *config.CompletedConfig, stopCh2 <-chan struct{}) error {
199201
logger := klog.FromContext(ctx)
200-
stopCh := ctx.Done()
202+
stopCh := mergeCh(ctx.Done(), stopCh2)
201203

202204
// To help debugging, immediately log version
203205
logger.Info("Starting", "version", utilversion.Get())
@@ -363,10 +365,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
363365
run(ctx, controllerDescriptors)
364366
},
365367
OnStoppedLeading: func() {
366-
logger.Error(nil, "leaderelection lost")
367-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
368+
select {
369+
case <-stopCh:
370+
// We were asked to terminate. Exit 0.
371+
klog.Info("Requested to terminate. Exiting.")
372+
os.Exit(0)
373+
default:
374+
// We lost the lock.
375+
logger.Error(nil, "leaderelection lost")
376+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
377+
}
368378
},
369-
})
379+
}, stopCh)
370380

371381
// If Leader Migration is enabled, proceed to attempt the migration lock.
372382
if leaderMigrator != nil {
@@ -390,10 +400,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
390400
run(ctx, controllerDescriptors)
391401
},
392402
OnStoppedLeading: func() {
393-
logger.Error(nil, "migration leaderelection lost")
394-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
403+
select {
404+
case <-stopCh:
405+
// We were asked to terminate. Exit 0.
406+
klog.Info("Requested to terminate. Exiting.")
407+
os.Exit(0)
408+
default:
409+
// We lost the lock.
410+
logger.Error(nil, "migration leaderelection lost")
411+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
412+
}
395413
},
396-
})
414+
}, stopCh)
397415
}
398416

399417
<-stopCh
@@ -903,7 +921,7 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
903921

904922
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
905923
// TODO: extract this function into staging/controller-manager
906-
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
924+
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
907925
logger := klog.FromContext(ctx)
908926
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
909927
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
@@ -919,7 +937,13 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
919937
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
920938
}
921939

922-
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
940+
leCtx, cancel := context.WithCancel(ctx)
941+
defer cancel()
942+
go func() {
943+
<-stopCh
944+
cancel()
945+
}()
946+
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
923947
Lock: rl,
924948
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
925949
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,

cmd/kube-controller-manager/app/patch.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,17 @@ func (rt *rejectIfNotReadyHeaderRT) RoundTrip(r *http.Request) (*http.Response,
164164
}
165165
return rt.baseRT.RoundTrip(r)
166166
}
167+
168+
// mergeCh takes two stop channels and return a single one that
169+
// closes as soon as one of the inputs closes or receives data.
170+
func mergeCh(stopCh1, stopCh2 <-chan struct{}) <-chan struct{} {
171+
merged := make(chan struct{})
172+
go func() {
173+
defer close(merged)
174+
select {
175+
case <-stopCh1:
176+
case <-stopCh2:
177+
}
178+
}()
179+
return merged
180+
}

cmd/kube-controller-manager/app/patch_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,56 @@ type fakeRTFunc func(r *http.Request) (*http.Response, error)
7272
func (rt fakeRTFunc) RoundTrip(r *http.Request) (*http.Response, error) {
7373
return rt(r)
7474
}
75+
76+
func TestMergeCh(t *testing.T) {
77+
testCases := []struct {
78+
name string
79+
chan1 chan struct{}
80+
chan2 chan struct{}
81+
closeFn func(chan struct{}, chan struct{})
82+
}{
83+
{
84+
name: "chan1 gets closed",
85+
chan1: make(chan struct{}),
86+
chan2: make(chan struct{}),
87+
closeFn: func(a, b chan struct{}) {
88+
close(a)
89+
},
90+
},
91+
{
92+
name: "chan2 gets closed",
93+
chan1: make(chan struct{}),
94+
chan2: make(chan struct{}),
95+
closeFn: func(a, b chan struct{}) {
96+
close(b)
97+
},
98+
},
99+
{
100+
name: "both channels get closed",
101+
chan1: make(chan struct{}),
102+
chan2: make(chan struct{}),
103+
closeFn: func(a, b chan struct{}) {
104+
close(a)
105+
close(b)
106+
},
107+
},
108+
{
109+
name: "channel receives data and returned channel is closed",
110+
chan1: make(chan struct{}),
111+
chan2: make(chan struct{}),
112+
closeFn: func(a, b chan struct{}) {
113+
a <- struct{}{}
114+
},
115+
},
116+
}
117+
118+
for _, tc := range testCases {
119+
t.Run(tc.name, func(t *testing.T) {
120+
go tc.closeFn(tc.chan1, tc.chan2)
121+
merged := mergeCh(tc.chan1, tc.chan2)
122+
if _, ok := <-merged; ok {
123+
t.Fatalf("expected closed channel, got data")
124+
}
125+
})
126+
}
127+
}

cmd/kube-controller-manager/app/testing/testserver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
123123
go func(ctx context.Context) {
124124
defer close(errCh)
125125

126-
if err := app.Run(ctx, config.Complete()); err != nil {
126+
stopCh := make(chan struct{})
127+
if err := app.Run(ctx, config.Complete(), stopCh); err != nil {
127128
errCh <- err
128129
}
129130
}(ctx)

0 commit comments

Comments
 (0)