Skip to content

Enhance event handling #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -15,17 +15,22 @@
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("rawtypes")
public class Operator {
public class Operator implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(Operator.class);
private final KubernetesClient k8sClient;
private final ConfigurationService configurationService;
private final ObjectMapper objectMapper;
private final List<Closeable> closeables;

public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
this(k8sClient, configurationService, new ObjectMapper());
@@ -38,6 +43,7 @@ public Operator(
this.k8sClient = k8sClient;
this.configurationService = configurationService;
this.objectMapper = objectMapper;
this.closeables = new ArrayList<>();
}

/**
@@ -64,6 +70,21 @@ public void start() {
}
}

/** Stop the operator. */
@Override
public void close() {
log.info("Operator {} is shutting down...", configurationService.getVersion().getSdkVersion());

for (Closeable closeable : this.closeables) {
try {
log.debug("closing {}", closeable);
closeable.close();
} catch (IOException e) {
log.warn("Error closing {}", closeable, e);
}
}
}

/**
* Registers the specified controller with this operator.
*
@@ -160,10 +181,15 @@ public <R extends CustomResource> void register(
customResourceCache,
watchAllNamespaces,
targetNamespaces,
defaultEventHandler,
configuration.isGenerationAware(),
finalizer);
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
finalizer,
resClass);

closeables.add(customResourceEventSource);
closeables.add(eventSourceManager);

customResourceEventSource.setEventHandler(defaultEventHandler);
customResourceEventSource.start();

log.info(
"Registered Controller: '{}' for CRD: '{}' for namespace(s): {}",
@@ -178,18 +204,14 @@ private CustomResourceEventSource createCustomResourceEventSource(
CustomResourceCache customResourceCache,
boolean watchAllNamespaces,
String[] targetNamespaces,
DefaultEventHandler defaultEventHandler,
boolean generationAware,
String finalizer) {
CustomResourceEventSource customResourceEventSource =
watchAllNamespaces
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
customResourceCache, client, generationAware, finalizer)
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
customResourceCache, client, targetNamespaces, generationAware, finalizer);

customResourceEventSource.setEventHandler(defaultEventHandler);

return customResourceEventSource;
String finalizer,
Class<?> resClass) {

return watchAllNamespaces
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
customResourceCache, client, generationAware, finalizer, resClass)
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass);
}
}
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ public class DefaultEventHandler implements EventHandler {
private final EventDispatcher eventDispatcher;
private final Retry retry;
private final Map<String, RetryExecution> retryState = new HashMap<>();
private final String controllerName;
private DefaultEventSourceManager eventSourceManager;

private final ReentrantLock lock = new ReentrantLock();
@@ -50,6 +51,7 @@ public DefaultEventHandler(
this.customResourceCache = customResourceCache;
this.eventDispatcher = eventDispatcher;
this.retry = retry;
this.controllerName = relatedControllerName;
eventBuffer = new EventBuffer();
executor =
new ScheduledThreadPoolExecutor(
@@ -70,6 +72,16 @@ public DefaultEventHandler(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
}

@Override
public void close() {
if (eventSourceManager != null) {
log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager);
eventSourceManager.close();
}

executor.shutdownNow();
}

public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.javaoperatorsdk.operator.processing.event;

import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -17,9 +17,8 @@ public class DefaultEventSourceManager implements EventSourceManager {
private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);

private final ReentrantLock lock = new ReentrantLock();
private Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
private CustomResourceEventSource customResourceEventSource;
Copy link
Collaborator

@csviri csviri Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see it correctly that this has been CustomResourceEventSource is moved to the Operator? Why is that?
The intention was that this class will own all the custom resources related to a controller.
But are there particular reasons to move it?
The downside is that if we want to have other events,between event sources and controllers vs event source, but also access access CustomResourceEventSource (where we thinking to move the caching for example) will not be accessible in this manager.

Copy link
Collaborator Author

@lburgazzoli lburgazzoli Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason was that the method:

public void registerCustomResourceEventSource(
      CustomResourceEventSource customResourceEventSource) {
    this.customResourceEventSource = customResourceEventSource;
    this.customResourceEventSource.addedToEventManager();
}

Was not really meaningful as yes the customResourceEventSource was assigned to an instance of the DefaultEventManager but was never accesses by the manager.

A better option would have been either to use registerEventSource as any other event source or pass the customResourceEventSource as a constructor parameter to make it clear that the DefaultEventManager requires it whereas as today, it looks like an unrelated method.

The event handler for the CustomResourceEventSource was also set outside the event manager directly in the operator which is quite confusing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, ok, yes this part definetelly needs improvements and refactorings. (Just like the others :) )
As I mentioned I would like to keep the event sources in one place, I think the constructor would make sense.
CustomResourceEventSource currently has quite special position, it's required always as an event source, so constructor is good ide also do express this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I will probably make some other improvements after this, getting back to work on #392

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@csviri yep I think I got the reasoning and I do agree but for the sake of the lifecycle it was quite confusing so I opted to make at little bit more clear (i.e to have set-up / tear-down on the same side, in this case on the Operator). If we move it on the contructor then we can move its set-up/tear-down as part of the event manager.

private DefaultEventHandler defaultEventHandler;
private final Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
private final DefaultEventHandler defaultEventHandler;
private TimerEventSource retryTimerEventSource;

public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) {
@@ -30,23 +29,53 @@ public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolea
}
}

public void registerCustomResourceEventSource(
CustomResourceEventSource customResourceEventSource) {
this.customResourceEventSource = customResourceEventSource;
this.customResourceEventSource.addedToEventManager();
@Override
public void close() {
try {
lock.lock();
for (var entry : eventSources.entrySet()) {
try {
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());
entry.getValue().close();
} catch (Exception e) {
log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e);
}
}

eventSources.clear();
} finally {
lock.unlock();
}
}

@Override
public <T extends EventSource> void registerEventSource(String name, T eventSource) {
public final void registerEventSource(String name, EventSource eventSource) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you make this method final?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it is invoked by the constructor so either the class is marked as final or this method need to be final to avoid issues in classes extending DefaultEventSourceManager and overriding registerEventSource

Copy link
Collaborator

@metacosm metacosm Apr 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. I remember hearing about this but I have never run into such issues in practice… Have you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, got bitten hard by it 😂 that's why I notice them

Objects.requireNonNull(eventSource, "EventSource must not be null");

try {
lock.lock();
EventSource currentEventSource = eventSources.get(name);
if (currentEventSource != null) {
if (eventSources.containsKey(name)) {
throw new IllegalStateException(
"Event source with name already registered. Event source name: " + name);
}
eventSources.put(name, eventSource);
eventSource.setEventHandler(defaultEventHandler);
eventSource.start();
} finally {
lock.unlock();
}
}

@Override
public Optional<EventSource> deRegisterEventSource(String name) {
try {
lock.lock();
EventSource currentEventSource = eventSources.remove(name);
if (currentEventSource != null) {
currentEventSource.close();
}

return Optional.ofNullable(currentEventSource);
} finally {
lock.unlock();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.javaoperatorsdk.operator.processing.event;

public interface EventHandler {
import java.io.Closeable;

public interface EventHandler extends Closeable {

void handleEvent(Event event);

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
package io.javaoperatorsdk.operator.processing.event;

public interface EventSource {
import java.io.Closeable;

public interface EventSource extends Closeable {

/**
* This method is invoked when this {@link EventSource} instance is properly registered to a
* {@link EventSourceManager}.
*/
default void start() {}

/**
* This method is invoked when this {@link EventSource} instance is de-registered from a {@link
* EventSourceManager}.
*/
@Override
default void close() {}

void setEventHandler(EventHandler eventHandler);

Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.Closeable;
import java.util.Map;
import java.util.Optional;

public interface EventSourceManager {
public interface EventSourceManager extends Closeable {

<T extends EventSource> void registerEventSource(String name, T eventSource);
/**
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
*
* @param name the name of the {@link EventSource} to add
* @param eventSource the {@link EventSource} to register
* @thorw IllegalStateException if an {@link EventSource} with the same name is already
* registered.
*/
void registerEventSource(String name, EventSource eventSource);

/**
* Remove the {@link EventSource} identified by the given <code>name</code> from the event
* manager.
*
* @param name the name of the {@link EventSource} to remove
* @return an optional {@link EventSource} which would be empty if no {@link EventSource} have
* been registered with the given name.
*/
Optional<EventSource> deRegisterEventSource(String name);

Optional<EventSource> deRegisterCustomResourceFromEventSource(
String name, String customResourceUid);

Map<String, EventSource> getRegisteredEventSources();

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -4,13 +4,16 @@
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl;
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -23,62 +26,84 @@ public class CustomResourceEventSource extends AbstractEventSource
private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class);

private final CustomResourceCache resourceCache;
private MixedOperation client;
private final MixedOperation client;
private final String[] targetNamespaces;
private final boolean generationAware;
private final String resourceFinalizer;
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
private final List<Watch> watches;
private final String resClass;

public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(
CustomResourceCache customResourceCache,
MixedOperation client,
boolean generationAware,
String resourceFinalizer) {
String resourceFinalizer,
Class<?> resClass) {
return new CustomResourceEventSource(
customResourceCache, client, null, generationAware, resourceFinalizer);
customResourceCache, client, null, generationAware, resourceFinalizer, resClass);
}

public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces(
CustomResourceCache customResourceCache,
MixedOperation client,
String[] namespaces,
boolean generationAware,
String resourceFinalizer) {
String resourceFinalizer,
Class<?> resClass) {
return new CustomResourceEventSource(
customResourceCache, client, namespaces, generationAware, resourceFinalizer);
customResourceCache, client, namespaces, generationAware, resourceFinalizer, resClass);
}

private CustomResourceEventSource(
CustomResourceCache customResourceCache,
MixedOperation client,
String[] targetNamespaces,
boolean generationAware,
String resourceFinalizer) {
String resourceFinalizer,
Class<?> resClass) {
this.resourceCache = customResourceCache;
this.client = client;
this.targetNamespaces = targetNamespaces;
this.generationAware = generationAware;
this.resourceFinalizer = resourceFinalizer;
this.watches = new ArrayList<>();
this.resClass = resClass.getName();
}

private boolean isWatchAllNamespaces() {
return targetNamespaces == null;
}

public void addedToEventManager() {
registerWatch();
}

private void registerWatch() {
@Override
public void start() {
CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client;
if (isWatchAllNamespaces()) {
crClient.inAnyNamespace().watch(this);
var w = crClient.inAnyNamespace().watch(this);
watches.add(w);
log.debug("Registered controller {} -> {} for any namespace", resClass, w);
} else if (targetNamespaces.length == 0) {
client.watch(this);
var w = client.watch(this);
watches.add(w);
log.debug(
"Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace());
} else {
for (String targetNamespace : targetNamespaces) {
crClient.inNamespace(targetNamespace).watch(this);
log.debug("Registered controller for namespace: {}", targetNamespace);
var w = crClient.inNamespace(targetNamespace).watch(this);
watches.add(w);
log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, targetNamespace);
}
}
}

@Override
public void close() {
for (Watch watch : this.watches) {
try {
log.debug("Closing watch {} -> {}", resClass, watch);
watch.close();
} catch (Exception e) {
log.warn("Error closing watcher {} -> {}", resClass, watch, e);
}
}
}
@@ -155,7 +180,8 @@ public void onClose(WatcherException e) {
if (e.isHttpGone()) {
log.warn("Received error for watch, will try to reconnect.", e);
try {
registerWatch();
close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really close all the watches? I guess there's no other option since we can't really know which watcher caused the exception…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the issue was that in case of an error on of a single watcher, calling registerWatch would re-register all the watcher, potentially you could end up in duplicated watchers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was another thing I wondered about (using a List for the watchers as opposed to a Set but I'm not sure we have consistent equals/hashCode implementations for the Watch implementations)…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need a way to improve how watchers are handled, I may try to have a look once this is done

start();
} catch (Throwable ex) {
log.error("Unexpected error happened with watch reconnect. Will exit.", e);
System.exit(1);
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ class CustomResourceEventSourceTest {

private CustomResourceEventSource customResourceEventSource =
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
customResourceCache, mixedOperation, true, FINALIZER);
customResourceCache, mixedOperation, true, FINALIZER, TestCustomResource.class);

@BeforeEach
public void setup() {
@@ -73,7 +73,7 @@ public void normalExecutionIfGenerationChanges() {
public void handlesAllEventIfNotGenerationAware() {
customResourceEventSource =
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
customResourceCache, mixedOperation, false, FINALIZER);
customResourceCache, mixedOperation, false, FINALIZER, TestCustomResource.class);
setup();

TestCustomResource customResource1 = TestUtils.testCustomResource();