Skip to content

Commit 4498d40

Browse files
committed
feat: add lifecycle hooks to the operator framework
1 parent 83b36c5 commit 4498d40

File tree

7 files changed

+84
-7
lines changed

7 files changed

+84
-7
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1616
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1717
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
18+
import java.io.Closeable;
19+
import java.io.IOException;
20+
import java.util.ArrayList;
1821
import java.util.Arrays;
22+
import java.util.List;
1923
import org.slf4j.Logger;
2024
import org.slf4j.LoggerFactory;
2125

2226
@SuppressWarnings("rawtypes")
23-
public class Operator {
27+
public class Operator implements AutoCloseable {
2428

2529
private static final Logger log = LoggerFactory.getLogger(Operator.class);
2630
private final KubernetesClient k8sClient;
2731
private final ConfigurationService configurationService;
2832
private final ObjectMapper objectMapper;
33+
private final List<Closeable> closeables;
2934

3035
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
3136
this(k8sClient, configurationService, new ObjectMapper());
@@ -38,6 +43,7 @@ public Operator(
3843
this.k8sClient = k8sClient;
3944
this.configurationService = configurationService;
4045
this.objectMapper = objectMapper;
46+
this.closeables = new ArrayList<>();
4147
}
4248

4349
/**
@@ -64,6 +70,18 @@ public void start() {
6470
}
6571
}
6672

73+
/** Stop the operator. */
74+
@Override
75+
public void close() {
76+
for (Closeable closeable : this.closeables) {
77+
try {
78+
closeable.close();
79+
} catch (IOException e) {
80+
log.warn("Error closing {}", closeable, e);
81+
}
82+
}
83+
}
84+
6785
/**
6886
* Registers the specified controller with this operator.
6987
*
@@ -163,6 +181,10 @@ public <R extends CustomResource> void register(
163181
defaultEventHandler,
164182
configuration.isGenerationAware(),
165183
finalizer);
184+
185+
closeables.add(customResourceEventSource);
186+
closeables.add(eventSourceManager);
187+
166188
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
167189

168190
log.info(
@@ -189,6 +211,7 @@ private CustomResourceEventSource createCustomResourceEventSource(
189211
customResourceCache, client, targetNamespaces, generationAware, finalizer);
190212

191213
customResourceEventSource.setEventHandler(defaultEventHandler);
214+
customResourceEventSource.start();
192215

193216
return customResourceEventSource;
194217
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ public DefaultEventHandler(
7070
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
7171
}
7272

73+
@Override
74+
public void close() {
75+
if (eventSourceManager != null) {
76+
eventSourceManager.close();
77+
}
78+
79+
executor.shutdownNow();
80+
}
81+
7382
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
7483
this.eventSourceManager = eventSourceManager;
7584
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolea
3030
}
3131
}
3232

33+
@Override
34+
public void close() {
35+
try {
36+
lock.lock();
37+
if (retryTimerEventSource != null) {
38+
retryTimerEventSource.close();
39+
}
40+
41+
for (EventSource eventSource : eventSources.values()) {
42+
eventSource.close();
43+
}
44+
45+
eventSources.clear();
46+
} finally {
47+
lock.unlock();
48+
}
49+
}
50+
3351
public void registerCustomResourceEventSource(
3452
CustomResourceEventSource customResourceEventSource) {
3553
customResourceEventSource.addedToEventManager();
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventHandler {
3+
import java.io.Closeable;
4+
5+
public interface EventHandler extends Closeable {
46

57
void handleEvent(Event event);
8+
9+
@Override
10+
default void close() {}
611
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventSource extends AutoCloseable {
3+
import java.io.Closeable;
4+
5+
public interface EventSource extends Closeable {
46

57
/**
68
* This method is invoked when this {@link EventSource} instance is properly registered to a

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.io.Closeable;
34
import java.util.Map;
45
import java.util.Optional;
56

6-
public interface EventSourceManager {
7+
public interface EventSourceManager extends Closeable {
78

89
/**
910
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
@@ -29,4 +30,7 @@ Optional<EventSource> deRegisterCustomResourceFromEventSource(
2930
String name, String customResourceUid);
3031

3132
Map<String, EventSource> getRegisteredEventSources();
33+
34+
@Override
35+
default void close() {}
3236
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
55

66
import io.fabric8.kubernetes.client.CustomResource;
7+
import io.fabric8.kubernetes.client.Watch;
78
import io.fabric8.kubernetes.client.Watcher;
89
import io.fabric8.kubernetes.client.WatcherException;
910
import io.fabric8.kubernetes.client.dsl.MixedOperation;
1011
import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl;
1112
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
1213
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
1314
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
15+
import java.util.ArrayList;
16+
import java.util.List;
1417
import java.util.Map;
1518
import java.util.concurrent.ConcurrentHashMap;
1619
import org.slf4j.Logger;
@@ -28,6 +31,7 @@ public class CustomResourceEventSource extends AbstractEventSource
2831
private final boolean generationAware;
2932
private final String resourceFinalizer;
3033
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
34+
private final List<Watch> watches;
3135

3236
public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(
3337
CustomResourceCache customResourceCache,
@@ -59,6 +63,7 @@ private CustomResourceEventSource(
5963
this.targetNamespaces = targetNamespaces;
6064
this.generationAware = generationAware;
6165
this.resourceFinalizer = resourceFinalizer;
66+
this.watches = new ArrayList<>();
6267
}
6368

6469
private boolean isWatchAllNamespaces() {
@@ -72,17 +77,28 @@ public void addedToEventManager() {
7277
private void registerWatch() {
7378
CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client;
7479
if (isWatchAllNamespaces()) {
75-
crClient.inAnyNamespace().watch(this);
80+
watches.add(crClient.inAnyNamespace().watch(this));
7681
} else if (targetNamespaces.length == 0) {
77-
client.watch(this);
82+
watches.add(client.watch(this));
7883
} else {
7984
for (String targetNamespace : targetNamespaces) {
80-
crClient.inNamespace(targetNamespace).watch(this);
85+
watches.add(crClient.inNamespace(targetNamespace).watch(this));
8186
log.debug("Registered controller for namespace: {}", targetNamespace);
8287
}
8388
}
8489
}
8590

91+
@Override
92+
public void close() {
93+
for (Watch watch : this.watches) {
94+
try {
95+
watch.close();
96+
} catch (Exception e) {
97+
log.warn("Error closing watcher {}", watch, e);
98+
}
99+
}
100+
}
101+
86102
@Override
87103
public void eventReceived(Watcher.Action action, CustomResource customResource) {
88104
log.debug(

0 commit comments

Comments
 (0)