diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 5d440193df..aac6a08289 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -271,7 +271,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) } private void cleanupAfterDeletedEvent(CustomResourceID customResourceUid) { - eventSourceManager.cleanup(customResourceUid); + eventSourceManager.cleanupForCustomResource(customResourceUid); eventBuffer.cleanup(customResourceUid); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 8142544c01..ef76244de3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,11 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -22,24 +17,22 @@ public class DefaultEventSourceManager> implements EventSourceManager { - public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; - public static final String CUSTOM_RESOURCE_EVENT_SOURCE_NAME = "custom-resource-event-source"; private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); private final ReentrantLock lock = new ReentrantLock(); - private final Map eventSources = new ConcurrentHashMap<>(); + private final Set eventSources = Collections.synchronizedSet(new HashSet<>()); private DefaultEventHandler defaultEventHandler; private TimerEventSource retryTimerEventSource; + private CustomResourceEventSource customResourceEventSource; DefaultEventSourceManager(DefaultEventHandler defaultEventHandler) { init(defaultEventHandler); } public DefaultEventSourceManager(ConfiguredController controller) { - CustomResourceEventSource customResourceEventSource = - new CustomResourceEventSource<>(controller); + customResourceEventSource = new CustomResourceEventSource<>(controller); init(new DefaultEventHandler<>(controller, customResourceEventSource)); - registerEventSource(CUSTOM_RESOURCE_EVENT_SOURCE_NAME, customResourceEventSource); + registerEventSource(customResourceEventSource); } private void init(DefaultEventHandler defaultEventHandler) { @@ -47,29 +40,26 @@ private void init(DefaultEventHandler defaultEventHandler) { defaultEventHandler.setEventSourceManager(this); this.retryTimerEventSource = new TimerEventSource<>(); - registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); + registerEventSource(retryTimerEventSource); } @Override public void close() { + lock.lock(); try { - lock.lock(); - try { defaultEventHandler.close(); } catch (Exception e) { log.warn("Error closing event handler", e); } - - for (var entry : eventSources.entrySet()) { + log.debug("Closing event sources."); + for (var eventSource : eventSources) { try { - log.debug("Closing {} -> {}", entry.getKey(), entry.getValue()); - entry.getValue().close(); + eventSource.close(); } catch (Exception e) { - log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e); + log.warn("Error closing {} -> {}", eventSource); } } - eventSources.clear(); } finally { lock.unlock(); @@ -77,17 +67,12 @@ public void close() { } @Override - public final void registerEventSource(String name, EventSource eventSource) + public final void registerEventSource(EventSource eventSource) throws OperatorException { Objects.requireNonNull(eventSource, "EventSource must not be null"); - + lock.lock(); try { - lock.lock(); - if (eventSources.containsKey(name)) { - throw new IllegalStateException( - "Event source with name already registered. Event source name: " + name); - } - eventSources.put(name, eventSource); + eventSources.add(eventSource); eventSource.setEventHandler(defaultEventHandler); eventSource.start(); } catch (Throwable e) { @@ -95,46 +80,18 @@ public final void registerEventSource(String name, EventSource eventSource) // leave untouched throw e; } - throw new OperatorException("Couldn't register event source named '" + name + "'", e); - } finally { - lock.unlock(); - } - } - - @Override - public Optional deRegisterEventSource(String name) { - try { - lock.lock(); - EventSource currentEventSource = eventSources.remove(name); - if (currentEventSource != null) { - try { - currentEventSource.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - return Optional.ofNullable(currentEventSource); + throw new OperatorException( + "Couldn't register event source: " + eventSource.getClass().getName(), e); } finally { lock.unlock(); } } - @Override - public Optional deRegisterCustomResourceFromEventSource( - String eventSourceName, CustomResourceID customResourceUid) { + public void cleanupForCustomResource(CustomResourceID customResourceUid) { + lock.lock(); try { - lock.lock(); - EventSource eventSource = this.eventSources.get(eventSourceName); - if (eventSource == null) { - log.warn( - "Event producer: {} not found for custom resource: {}", - eventSourceName, - customResourceUid); - return Optional.empty(); - } else { - eventSource.eventSourceDeRegisteredForResource(customResourceUid); - return Optional.of(eventSource); + for (EventSource eventSource : this.eventSources) { + eventSource.cleanupForCustomResource(customResourceUid); } } finally { lock.unlock(); @@ -146,19 +103,13 @@ public TimerEventSource getRetryTimerEventSource() { } @Override - public Map getRegisteredEventSources() { - return Collections.unmodifiableMap(eventSources); + public Set getRegisteredEventSources() { + return Collections.unmodifiableSet(eventSources); } @Override public CustomResourceEventSource getCustomResourceEventSource() { - return (CustomResourceEventSource) getRegisteredEventSources() - .get(CUSTOM_RESOURCE_EVENT_SOURCE_NAME); + return customResourceEventSource; } - public void cleanup(CustomResourceID customResourceUid) { - getRegisteredEventSources() - .keySet() - .forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java index 1cc05f632c..22187ddb86 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java @@ -20,5 +20,10 @@ default void close() throws IOException {} void setEventHandler(EventHandler eventHandler); - default void eventSourceDeRegisteredForResource(CustomResourceID customResourceUid) {} + /** + * Automatically called when a custom resource is deleted from the cluster. + * + * @param customResourceUid - id of custom resource + */ + default void cleanupForCustomResource(CustomResourceID customResourceUid) {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 86638c2786..7e049e7c51 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -2,8 +2,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Map; -import java.util.Optional; +import java.util.Set; import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.OperatorException; @@ -14,29 +13,15 @@ public interface EventSourceManager> extends Clos /** * Add the {@link EventSource} identified by the given name to the event manager. * - * @param name the name of the {@link EventSource} to add * @param eventSource the {@link EventSource} to register * @throws IllegalStateException if an {@link EventSource} with the same name is already * registered. * @throws OperatorException if an error occurred during the registration process */ - void registerEventSource(String name, EventSource eventSource) + void registerEventSource(EventSource eventSource) throws IllegalStateException, OperatorException; - /** - * Remove the {@link EventSource} identified by the given name from the event - * manager. - * - * @param name the name of the {@link EventSource} to remove - * @return an optional {@link EventSource} which would be empty if no {@link EventSource} have - * been registered with the given name. - */ - Optional deRegisterEventSource(String name); - - Optional deRegisterCustomResourceFromEventSource( - String name, CustomResourceID customResourceUid); - - Map getRegisteredEventSources(); + Set getRegisteredEventSources(); CustomResourceEventSource getCustomResourceEventSource(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java index 51f21fc4d4..d4638f5c3b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java @@ -50,7 +50,7 @@ public void scheduleOnce(R customResource, long delay) { } @Override - public void eventSourceDeRegisteredForResource(CustomResourceID customResourceUid) { + public void cleanupForCustomResource(CustomResourceID customResourceUid) { cancelSchedule(customResourceUid); cancelOnceSchedule(customResourceUid); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 620a7c9ac7..023d589e03 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -119,7 +119,7 @@ public void cleanUpAfterDeleteEvent() { waitMinimalTime(); verify(defaultEventSourceManagerMock, times(1)) - .cleanup(CustomResourceID.fromResource(customResource)); + .cleanupForCustomResource(CustomResourceID.fromResource(customResource)); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 089215486e..ad87000a52 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -1,7 +1,7 @@ package io.javaoperatorsdk.operator.processing.event; import java.io.IOException; -import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Test; @@ -10,7 +10,6 @@ import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -18,8 +17,6 @@ class DefaultEventSourceManagerTest { - public static final String CUSTOM_EVENT_SOURCE_NAME = "CustomEventSource"; - private DefaultEventHandler defaultEventHandlerMock = mock(DefaultEventHandler.class); private DefaultEventSourceManager defaultEventSourceManager = new DefaultEventSourceManager(defaultEventHandlerMock); @@ -28,12 +25,12 @@ class DefaultEventSourceManagerTest { public void registersEventSource() { EventSource eventSource = mock(EventSource.class); - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + defaultEventSourceManager.registerEventSource(eventSource); - Map registeredSources = + Set registeredSources = defaultEventSourceManager.getRegisteredEventSources(); - assertThat(registeredSources.entrySet()).hasSize(2); - assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource); + assertThat(registeredSources).hasSize(2); + verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock)); verify(eventSource, times(1)).start(); } @@ -42,8 +39,8 @@ public void registersEventSource() { public void closeShouldCascadeToEventSources() throws IOException { EventSource eventSource = mock(EventSource.class); EventSource eventSource2 = mock(EventSource.class); - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME + "2", eventSource2); + defaultEventSourceManager.registerEventSource(eventSource); + defaultEventSourceManager.registerEventSource(eventSource2); defaultEventSourceManager.close(); @@ -51,28 +48,16 @@ public void closeShouldCascadeToEventSources() throws IOException { verify(eventSource2, times(1)).close(); } - @Test - public void throwExceptionIfRegisteringEventSourceWithSameName() { - EventSource eventSource = mock(EventSource.class); - EventSource eventSource2 = mock(EventSource.class); - - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy( - () -> defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, - eventSource2)); - } - @Test public void deRegistersEventSources() { CustomResource customResource = TestUtils.testCustomResource(); EventSource eventSource = mock(EventSource.class); - defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + defaultEventSourceManager.registerEventSource(eventSource); - defaultEventSourceManager.deRegisterCustomResourceFromEventSource( - CUSTOM_EVENT_SOURCE_NAME, CustomResourceID.fromResource(customResource)); + defaultEventSourceManager + .cleanupForCustomResource(CustomResourceID.fromResource(customResource)); verify(eventSource, times(1)) - .eventSourceDeRegisteredForResource(eq(CustomResourceID.fromResource(customResource))); + .cleanupForCustomResource(eq(CustomResourceID.fromResource(customResource))); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java index f7f2814255..0d9c3b5a11 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java @@ -61,7 +61,7 @@ public void deRegistersPeriodicalEventSources() { untilAsserted(() -> assertThat(eventHandlerMock.events).hasSizeGreaterThan(1)); timerEventSource - .eventSourceDeRegisteredForResource(CustomResourceID.fromResource(customResource)); + .cleanupForCustomResource(CustomResourceID.fromResource(customResource)); int size = eventHandlerMock.events.size(); untilAsserted(() -> assertThat(eventHandlerMock.events).hasSize(size)); @@ -103,7 +103,7 @@ public void deRegistersOnceEventSources() { timerEventSource.scheduleOnce(customResource, PERIOD); timerEventSource - .eventSourceDeRegisteredForResource(CustomResourceID.fromResource(customResource)); + .cleanupForCustomResource(CustomResourceID.fromResource(customResource)); untilAsserted(() -> assertThat(eventHandlerMock.events).isEmpty()); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java index f262440cae..27b651c9fb 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java @@ -32,7 +32,7 @@ public class EventSourceTestCustomResourceController @Override public void init(EventSourceManager eventSourceManager) { - eventSourceManager.registerEventSource("Timer", timerEventSource); + eventSourceManager.registerEventSource(timerEventSource); } @Override diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java index e5c34e1610..a2284adfa1 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java @@ -37,7 +37,7 @@ public class InformerEventSourceTestCustomResourceController implements public void init(EventSourceManager eventSourceManager) { eventSource = new InformerEventSource<>(kubernetesClient, ConfigMap.class, Mappers.fromAnnotation(RELATED_RESOURCE_UID)); - eventSourceManager.registerEventSource("configmap", eventSource); + eventSourceManager.registerEventSource(eventSource); } @Override