diff --git a/micrometer-support/src/main/java/io/javaoperatorsdk/operator/micrometer/MicrometerMetrics.java b/micrometer-support/src/main/java/io/javaoperatorsdk/operator/micrometer/MicrometerMetrics.java deleted file mode 100644 index 3c8074f8f8..0000000000 --- a/micrometer-support/src/main/java/io/javaoperatorsdk/operator/micrometer/MicrometerMetrics.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.javaoperatorsdk.operator.micrometer; - -import java.util.Collections; -import java.util.Map; - -import io.javaoperatorsdk.operator.Metrics; -import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor; -import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -import io.javaoperatorsdk.operator.processing.event.Event; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; - -public class MicrometerMetrics implements Metrics { - - public static final String PREFIX = "operator.sdk."; - private final MeterRegistry registry; - private final EventMonitor monitor = new EventMonitor() { - @Override - public void processedEvent(CustomResourceID uid, Event event) { - incrementProcessedEventsNumber(); - } - - @Override - public void failedEvent(CustomResourceID uid, Event event) { - incrementControllerRetriesNumber(); - } - }; - - public MicrometerMetrics(MeterRegistry registry) { - this.registry = registry; - } - - public T timeControllerExecution(ControllerExecution execution) { - final var name = execution.controllerName(); - final var execName = PREFIX + "controllers.execution." + execution.name(); - final var timer = - Timer.builder(execName) - .tags("controller", name) - .publishPercentiles(0.3, 0.5, 0.95) - .publishPercentileHistogram() - .register(registry); - try { - final var result = timer.record(execution::execute); - final var successType = execution.successTypeName(result); - registry - .counter(execName + ".success", "controller", name, "type", successType) - .increment(); - return result; - } catch (Exception e) { - final var exception = e.getClass().getSimpleName(); - registry - .counter(execName + ".failure", "controller", name, "exception", exception) - .increment(); - throw e; - } - } - - public void incrementControllerRetriesNumber() { - registry - .counter( - PREFIX + "retry.on.exception", "retry", "retryCounter", "type", - "retryException") - .increment(); - - } - - public void incrementProcessedEventsNumber() { - registry - .counter( - PREFIX + "total.events.received", "events", "totalEvents", "type", - "eventsReceived") - .increment(); - - } - - public > T monitorSizeOf(T map, String name) { - return registry.gaugeMapSize(PREFIX + name + ".size", Collections.emptyList(), map); - } - - @Override - public EventMonitor getEventMonitor() { - return monitor; - } -} diff --git a/micrometer-support/src/main/java/io/javaoperatorsdk/operator/monitoring/micrometer/MicrometerMetrics.java b/micrometer-support/src/main/java/io/javaoperatorsdk/operator/monitoring/micrometer/MicrometerMetrics.java new file mode 100644 index 0000000000..a90460d825 --- /dev/null +++ b/micrometer-support/src/main/java/io/javaoperatorsdk/operator/monitoring/micrometer/MicrometerMetrics.java @@ -0,0 +1,98 @@ +package io.javaoperatorsdk.operator.monitoring.micrometer; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import io.javaoperatorsdk.operator.api.RetryInfo; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; + +public class MicrometerMetrics implements Metrics { + + private static final String PREFIX = "operator.sdk."; + private static final String RECONCILIATIONS = "reconciliations."; + private final MeterRegistry registry; + + public MicrometerMetrics(MeterRegistry registry) { + this.registry = registry; + } + + public T timeControllerExecution(ControllerExecution execution) { + final var name = execution.controllerName(); + final var execName = PREFIX + "controllers.execution." + execution.name(); + final var timer = + Timer.builder(execName) + .tags("controller", name) + .publishPercentiles(0.3, 0.5, 0.95) + .publishPercentileHistogram() + .register(registry); + try { + final var result = timer.record(execution::execute); + final var successType = execution.successTypeName(result); + registry + .counter(execName + ".success", "controller", name, "type", successType) + .increment(); + return result; + } catch (Exception e) { + final var exception = e.getClass().getSimpleName(); + registry + .counter(execName + ".failure", "controller", name, "exception", exception) + .increment(); + throw e; + } + } + + public void receivedEvent(Event event) { + incrementCounter(event.getRelatedCustomResourceID(), "events.received", "event", + event.getClass().getSimpleName()); + } + + @Override + public void cleanupDoneFor(CustomResourceID customResourceUid) { + incrementCounter(customResourceUid, "events.delete"); + } + + public void reconcileCustomResource(CustomResourceID customResourceID, + RetryInfo retryInfo) { + incrementCounter(customResourceID, RECONCILIATIONS + "started", + RECONCILIATIONS + "retries.number", "" + retryInfo.getAttemptCount(), + RECONCILIATIONS + "retries.last", "" + retryInfo.isLastAttempt()); + } + + @Override + public void finishedReconciliation(CustomResourceID customResourceID) { + incrementCounter(customResourceID, RECONCILIATIONS + "success"); + } + + public void failedReconciliation(CustomResourceID customResourceID, RuntimeException exception) { + var cause = exception.getCause(); + if (cause == null) { + cause = exception; + } else if (cause instanceof RuntimeException) { + cause = cause.getCause() != null ? cause.getCause() : cause; + } + incrementCounter(customResourceID, RECONCILIATIONS + "failed", "exception", + cause.getClass().getSimpleName()); + } + + public > T monitorSizeOf(T map, String name) { + return registry.gaugeMapSize(PREFIX + name + ".size", Collections.emptyList(), map); + } + + private void incrementCounter(CustomResourceID id, String counterName, String... additionalTags) { + var tags = List.of( + "name", id.getName(), + "name", id.getName(), "namespace", id.getNamespace().orElse(""), + "scope", id.getNamespace().isPresent() ? "namespace" : "cluster"); + if (additionalTags != null && additionalTags.length > 0) { + tags = new LinkedList<>(tags); + tags.addAll(List.of(additionalTags)); + } + registry.counter(PREFIX + counterName, tags.toArray(new String[0])).increment(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/EventListUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/EventListUtils.java deleted file mode 100644 index 0fe18bdae0..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/EventListUtils.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.javaoperatorsdk.operator; - -import java.util.List; - -import io.javaoperatorsdk.operator.processing.event.Event; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; - -public class EventListUtils { - - public static boolean containsCustomResourceDeletedEvent(List events) { - return events.stream() - .anyMatch( - e -> { - if (e instanceof CustomResourceEvent) { - return ((CustomResourceEvent) e).getAction() == ResourceAction.DELETED; - } else { - return false; - } - }); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java deleted file mode 100644 index a3f3ddccb5..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java +++ /dev/null @@ -1,37 +0,0 @@ -package io.javaoperatorsdk.operator; - -import java.util.Map; - -import io.javaoperatorsdk.operator.processing.DefaultEventHandler; -import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor; - -public interface Metrics { - Metrics NOOP = new Metrics() {}; - - - interface ControllerExecution { - String name(); - - String controllerName(); - - String successTypeName(T result); - - T execute(); - } - - default T timeControllerExecution(ControllerExecution execution) { - return execution.execute(); - } - - default void incrementControllerRetriesNumber() {} - - default void incrementProcessedEventsNumber() {} - - default > T monitorSizeOf(T map, String name) { - return map; - } - - default DefaultEventHandler.EventMonitor getEventMonitor() { - return EventMonitor.NOOP; - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/EventSourceInitializer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/EventSourceInitializer.java new file mode 100644 index 0000000000..e7500b9e47 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/EventSourceInitializer.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.api; + +import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; + +public interface EventSourceInitializer> { + + /** + * In this typically you might want to register event sources. But can access + * CustomResourceEventSource, what might be handy for some edge cases. + * + * @param eventSourceManager the {@link EventSourceManager} where event sources can be registered. + */ + void prepareEventSources(EventSourceManager eventSourceManager); + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java index 5779cc4eb9..817cb554fc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.api; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; public interface ResourceController { @@ -49,11 +48,4 @@ default DeleteControl deleteResource(R resource, Context context) { */ UpdateControl createOrUpdateResource(R resource, Context context); - /** - * In init typically you might want to register event sources. - * - * @param eventSourceManager the {@link EventSourceManager} which handles this controller and with - * which event sources can be registered - */ - default void init(EventSourceManager eventSourceManager) {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index ec61108bf2..fb53f7eaae 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -6,8 +6,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.Metrics; import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index f1faef44f8..2cecfb552d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -4,8 +4,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.Metrics; import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; public class ConfigurationServiceOverrider { private final ConfigurationService original; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/monitoring/Metrics.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/monitoring/Metrics.java new file mode 100644 index 0000000000..1c3fced1f5 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/monitoring/Metrics.java @@ -0,0 +1,42 @@ +package io.javaoperatorsdk.operator.api.monitoring; + +import java.util.Map; + +import io.javaoperatorsdk.operator.api.RetryInfo; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; +import io.javaoperatorsdk.operator.processing.event.Event; + +public interface Metrics { + Metrics NOOP = new Metrics() {}; + + default void receivedEvent(Event event) {} + + default void reconcileCustomResource(CustomResourceID customResourceID, + RetryInfo retryInfo) {} + + default void failedReconciliation(CustomResourceID customResourceID, + RuntimeException exception) {} + + default void cleanupDoneFor(CustomResourceID customResourceUid) {}; + + default void finishedReconciliation(CustomResourceID resourceID) {}; + + + interface ControllerExecution { + String name(); + + String controllerName(); + + String successTypeName(T result); + + T execute(); + } + + default T timeControllerExecution(ControllerExecution execution) { + return execution.execute(); + } + + default > T monitorSizeOf(T map, String name) { + return map; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java index 3ce90ebb67..0f3bf2d9fd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java @@ -11,23 +11,20 @@ import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.CustomResourceUtils; -import io.javaoperatorsdk.operator.Metrics.ControllerExecution; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.api.Context; -import io.javaoperatorsdk.operator.api.DeleteControl; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.UpdateControl; +import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; public class ConfiguredController> implements ResourceController, - Closeable { + Closeable, EventSourceInitializer { private final ResourceController controller; private final ControllerConfiguration configuration; private final KubernetesClient kubernetesClient; - private EventSourceManager eventSourceManager; + private DefaultEventSourceManager eventSourceManager; public ConfiguredController(ResourceController controller, ControllerConfiguration configuration, @@ -97,7 +94,7 @@ public UpdateControl execute() { } @Override - public void init(EventSourceManager eventSourceManager) { + public void prepareEventSources(EventSourceManager eventSourceManager) { throw new UnsupportedOperationException("This method should never be called directly"); } @@ -169,7 +166,9 @@ public void start() throws OperatorException { try { eventSourceManager = new DefaultEventSourceManager<>(this); - controller.init(eventSourceManager); + if (controller instanceof EventSourceInitializer) { + ((EventSourceInitializer) controller).prepareEventSources(eventSourceManager); + } } catch (MissingCRDException e) { throwMissingCRDException(crdName, specVersion, controllerName); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 5c21747623..045b24b4c6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -16,6 +16,7 @@ import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; @@ -37,9 +38,6 @@ public class DefaultEventHandler> implements Even private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); - @Deprecated - private static EventMonitor monitor = EventMonitor.NOOP; - private final Set underProcessing = new HashSet<>(); private final EventDispatcher eventDispatcher; private final Retry retry; @@ -47,7 +45,7 @@ public class DefaultEventHandler> implements Even private final ExecutorService executor; private final String controllerName; private final ReentrantLock lock = new ReentrantLock(); - private final EventMonitor eventMonitor; + private final Metrics metrics; private volatile boolean running; private final ResourceCache resourceCache; private DefaultEventSourceManager eventSourceManager; @@ -60,7 +58,7 @@ public DefaultEventHandler(ConfiguredController controller, ResourceCache controller.getConfiguration().getName(), new EventDispatcher<>(controller), GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), - controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor(), + controller.getConfiguration().getConfigurationService().getMetrics(), new EventMarker()); } @@ -72,7 +70,7 @@ public DefaultEventHandler(ConfiguredController controller, ResourceCache private DefaultEventHandler(ResourceCache resourceCache, ExecutorService executor, String relatedControllerName, - EventDispatcher eventDispatcher, Retry retry, EventMonitor monitor, + EventDispatcher eventDispatcher, Retry retry, Metrics metrics, EventMarker eventMarker) { this.running = true; this.executor = @@ -84,7 +82,7 @@ private DefaultEventHandler(ResourceCache resourceCache, ExecutorService exec this.eventDispatcher = eventDispatcher; this.retry = retry; this.resourceCache = resourceCache; - this.eventMonitor = monitor != null ? monitor : EventMonitor.NOOP; + this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventMarker = eventMarker; } @@ -92,29 +90,6 @@ public void setEventSourceManager(DefaultEventSourceManager eventSourceManage this.eventSourceManager = eventSourceManager; } - /* - * TODO: promote this interface to top-level, probably create a `monitoring` package? - */ - public interface EventMonitor { - EventMonitor NOOP = new EventMonitor() { - @Override - public void processedEvent(CustomResourceID uid, Event event) {} - - @Override - public void failedEvent(CustomResourceID uid, Event event) {} - }; - - void processedEvent(CustomResourceID uid, Event event); - - void failedEvent(CustomResourceID uid, Event event); - } - - private EventMonitor monitor() { - // todo: remove us of static monitor, only here for backwards compatibility - return DefaultEventHandler.monitor != EventMonitor.NOOP ? DefaultEventHandler.monitor - : eventMonitor; - } - @Override public void handleEvent(Event event) { lock.lock(); @@ -124,21 +99,22 @@ public void handleEvent(Event event) { log.debug("Skipping event: {} because the event handler is shutting down", event); return; } - final var monitor = monitor(); - monitor.processedEvent(event.getRelatedCustomResourceID(), event); + final var resourceID = event.getRelatedCustomResourceID(); + metrics.receivedEvent(event); handleEventMarking(event); - if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) { - submitReconciliationExecution(event.getRelatedCustomResourceID()); + if (!eventMarker.deleteEventPresent(resourceID)) { + submitReconciliationExecution(resourceID); } else { - cleanupForDeletedEvent(event.getRelatedCustomResourceID()); + cleanupForDeletedEvent(resourceID); } + } finally { lock.unlock(); } } - private boolean submitReconciliationExecution(CustomResourceID customResourceUid) { + private void submitReconciliationExecution(CustomResourceID customResourceUid) { boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid); Optional latestCustomResource = resourceCache.getCustomResource(customResourceUid); @@ -146,14 +122,15 @@ private boolean submitReconciliationExecution(CustomResourceID customResourceUid if (!controllerUnderExecution && latestCustomResource.isPresent()) { setUnderExecutionProcessing(customResourceUid); - ExecutionScope executionScope = - new ExecutionScope( + final var retryInfo = retryInfo(customResourceUid); + ExecutionScope executionScope = + new ExecutionScope<>( latestCustomResource.get(), - retryInfo(customResourceUid)); + retryInfo); eventMarker.unMarkEventReceived(customResourceUid); + metrics.reconcileCustomResource(customResourceUid, retryInfo); log.debug("Executing events for custom resource. Scope: {}", executionScope); executor.execute(new ControllerExecution(executionScope)); - return true; } else { log.debug( "Skipping executing controller for resource id: {}." @@ -165,7 +142,6 @@ private boolean submitReconciliationExecution(CustomResourceID customResourceUid log.warn("no custom resource found in cache for CustomResourceID: {}", customResourceUid); } - return false; } } @@ -201,13 +177,12 @@ void eventProcessingFinished( // Either way we don't want to retry. if (isRetryConfigured() && postExecutionControl.exceptionDuringExecution() && !eventMarker.deleteEventPresent(customResourceID)) { - handleRetryOnException(executionScope); - // todo revisit monitoring since events are not present anymore - // final var monitor = monitor(); executionScope.getEvents().forEach(e -> - // monitor.failedEvent(executionScope.getCustomResourceID(), e)); + handleRetryOnException(executionScope, + postExecutionControl.getRuntimeException().orElseThrow()); return; } cleanupOnSuccessfulExecution(executionScope); + metrics.finishedReconciliation(customResourceID); if (eventMarker.deleteEventPresent(customResourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { @@ -249,14 +224,11 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope execution if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { return true; } - if (cachedCustomResourceVersion.equals(originalResourceVersion)) { - return false; - } // If the cached resource version equals neither the version before nor after execution // probably an update happened on the custom resource independent of the framework during // reconciliation. We cannot tell at this point if it happened before our update or before. // (Well we could if we would parse resource version, but that should not be done by definition) - return true; + return !cachedCustomResourceVersion.equals(originalResourceVersion); } private void reScheduleExecutionIfInstructed(PostExecutionControl postExecutionControl, @@ -271,7 +243,8 @@ private void reScheduleExecutionIfInstructed(PostExecutionControl postExecuti * events (received meanwhile retry is in place or already in buffer) instantly or always wait * according to the retry timing if there was an exception. */ - private void handleRetryOnException(ExecutionScope executionScope) { + private void handleRetryOnException(ExecutionScope executionScope, + RuntimeException exception) { RetryExecution execution = getOrInitRetryExecution(executionScope); var customResourceID = executionScope.getCustomResourceID(); boolean eventPresent = eventMarker.eventPresent(customResourceID); @@ -291,6 +264,7 @@ private void handleRetryOnException(ExecutionScope executionScope) { "Scheduling timer event for retry with delay:{} for resource: {}", delay, customResourceID); + metrics.failedReconciliation(customResourceID, exception); eventSourceManager .getRetryAndRescheduleTimerEventSource() .scheduleOnce(executionScope.getCustomResource(), delay); @@ -322,6 +296,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) private void cleanupForDeletedEvent(CustomResourceID customResourceUid) { eventSourceManager.cleanupForCustomResource(customResourceUid); eventMarker.cleanup(customResourceUid); + metrics.cleanupDoneFor(customResourceUid); } private boolean isControllerUnderExecution(CustomResourceID customResourceUid) { @@ -360,10 +335,17 @@ private ControllerExecution(ExecutionScope executionScope) { @Override public void run() { // change thread name for easier debugging - Thread.currentThread().setName("EventHandler-" + controllerName); - PostExecutionControl postExecutionControl = - eventDispatcher.handleExecution(executionScope); - eventProcessingFinished(executionScope, postExecutionControl); + final var thread = Thread.currentThread(); + final var name = thread.getName(); + try { + thread.setName("EventHandler-" + controllerName); + PostExecutionControl postExecutionControl = + eventDispatcher.handleExecution(executionScope); + eventProcessingFinished(executionScope, postExecutionControl); + } finally { + // restore original name + thread.setName(name); + } } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index d72bd6ec7e..5d900ab5cc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; +import java.io.Closeable; import java.util.*; import java.util.concurrent.locks.ReentrantLock; @@ -15,7 +16,7 @@ import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; public class DefaultEventSourceManager> - implements EventSourceManager { + implements EventSourceManager, Closeable { private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); @@ -23,7 +24,7 @@ public class DefaultEventSourceManager> private final Set eventSources = Collections.synchronizedSet(new HashSet<>()); private DefaultEventHandler defaultEventHandler; private TimerEventSource retryAndRescheduleTimerEventSource; - private CustomResourceEventSource customResourceEventSource; + private CustomResourceEventSource customResourceEventSource; DefaultEventSourceManager(DefaultEventHandler defaultEventHandler) { init(defaultEventHandler); @@ -57,7 +58,7 @@ public void close() { try { eventSource.close(); } catch (Exception e) { - log.warn("Error closing {} -> {}", eventSource); + log.warn("Error closing {} -> {}", eventSource, e); } } eventSources.clear(); @@ -98,7 +99,7 @@ public void cleanupForCustomResource(CustomResourceID customResourceUid) { } } - public TimerEventSource getRetryAndRescheduleTimerEventSource() { + public TimerEventSource getRetryAndRescheduleTimerEventSource() { return retryAndRescheduleTimerEventSource; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 7e049e7c51..e06ab2e3d1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -1,14 +1,12 @@ package io.javaoperatorsdk.operator.processing.event; -import java.io.Closeable; -import java.io.IOException; import java.util.Set; import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; -public interface EventSourceManager> extends Closeable { +public interface EventSourceManager> { /** * Add the {@link EventSource} identified by the given name to the event manager. @@ -25,6 +23,4 @@ void registerEventSource(EventSource eventSource) CustomResourceEventSource getCustomResourceEventSource(); - @Override - default void close() throws IOException {} } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java index b709c026e1..da32b467e0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java @@ -10,7 +10,6 @@ import org.mockito.ArgumentMatchers; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.Metrics; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.DeleteControl; @@ -19,6 +18,7 @@ import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index a24c32a547..b8e4381f3b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -9,10 +9,10 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; -import io.javaoperatorsdk.operator.Metrics; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; +import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.ConfiguredController; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.EventHandler; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java index 33e590f75f..caa5ad9fda 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java @@ -7,17 +7,15 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.ControllerUtils; -import io.javaoperatorsdk.operator.api.Context; -import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.UpdateControl; +import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; @Controller public class EventSourceTestCustomResourceController - implements ResourceController, TestExecutionInfoProvider { + implements ResourceController, EventSourceInitializer, + TestExecutionInfoProvider { public static final String FINALIZER_NAME = ControllerUtils.getDefaultFinalizerName( @@ -31,7 +29,7 @@ public class EventSourceTestCustomResourceController new TimerEventSource<>(); @Override - public void init(EventSourceManager eventSourceManager) { + public void prepareEventSources(EventSourceManager eventSourceManager) { eventSourceManager.registerEventSource(timerEventSource); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java index a2284adfa1..772c93cac3 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomResourceController.java @@ -5,10 +5,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.Context; -import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.UpdateControl; +import io.javaoperatorsdk.operator.api.*; import io.javaoperatorsdk.operator.junit.KubernetesClientAware; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.InformerEventSource; @@ -22,7 +19,8 @@ */ @Controller(finalizerName = NO_FINALIZER) public class InformerEventSourceTestCustomResourceController implements - ResourceController, KubernetesClientAware { + ResourceController, KubernetesClientAware, + EventSourceInitializer { private static final Logger LOGGER = LoggerFactory.getLogger(InformerEventSourceTestCustomResourceController.class); @@ -34,7 +32,7 @@ public class InformerEventSourceTestCustomResourceController implements private InformerEventSource eventSource; @Override - public void init(EventSourceManager eventSourceManager) { + public void prepareEventSources(EventSourceManager eventSourceManager) { eventSource = new InformerEventSource<>(kubernetesClient, ConfigMap.class, Mappers.fromAnnotation(RELATED_RESOURCE_UID)); eventSourceManager.registerEventSource(eventSource);