Skip to content

Event Notification and Polling, Inbound Event Sources #720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
Expand All @@ -106,5 +105,20 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>jcache</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;

public interface ControllerConfiguration<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

public class ControllerConfigurationOverrider<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

public class DefaultControllerConfiguration<R extends HasMetadata>
implements ControllerConfiguration<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ private boolean failOnMissingCurrentNS() {
return false;
}

public EventSourceManager<R> getEventSourceManager() {
return eventSourceManager;
}

public void stop() {
if (eventSourceManager != null) {
eventSourceManager.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.Objects;

public class Event {

private final ResourceID relatedCustomResource;
Expand All @@ -18,4 +20,19 @@ public String toString() {
"relatedCustomResource=" + relatedCustomResource +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Event event = (Event) o;
return Objects.equals(relatedCustomResource, event.relatedCustomResource);
}

@Override
public int hashCode() {
return Objects.hash(relatedCustomResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
Expand Down Expand Up @@ -297,7 +297,6 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
}

private void cleanupForDeletedEvent(ResourceID customResourceUid) {
eventSourceManager.cleanupForCustomResource(customResourceUid);
eventMarker.cleanup(customResourceUid);
metrics.cleanupDoneFor(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

public class EventSourceManager<R extends HasMetadata>
implements EventSourceRegistry<R>, LifecycleAware {
Expand Down Expand Up @@ -107,14 +109,22 @@ public final void registerEventSource(EventSource eventSource)
}
}

public void cleanupForCustomResource(ResourceID customResourceUid) {
lock.lock();
try {
for (EventSource eventSource : this.eventSources) {
eventSource.cleanupForResource(customResourceUid);
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
for (EventSource eventSource : this.eventSources) {
if (eventSource instanceof ResourceEventAware) {
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
switch (action) {
case ADDED:
lifecycleAwareES.onResourceCreated(resource);
break;
case UPDATED:
lifecycleAwareES.onResourceUpdated(resource, oldResource);
break;
case DELETED:
lifecycleAwareES.onResourceDeleted(resource);
break;
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;

public class ResourceID {
public class ResourceID implements Serializable {

public static ResourceID fromResource(HasMetadata resource) {
return new ResourceID(resource.getMetadata().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ public abstract class AbstractEventSource implements EventSource {
public void setEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Optional;

import javax.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

/**
* Base class for event sources with filtering and caching capabilities.
* <p>
* {@link #handleDelete(ResourceID)} - if the related resource is present in the cache it is
* removed. and event propagated. There is no event propagated if the resource is not in the cache.
* <p>
* {@link #handleEvent(Object, ResourceID)} - caches the resource if changed (or nto present
* before). Propagates an event if the resource is new or not equals to the one in the cache, and if
* accepted by the filter if one is present.
*
* @param <T> represents the resource (usually external non-kubernetes one) handled.
*/
public abstract class CachingFilteringEventSource<T> extends LifecycleAwareEventSource {

private static final Logger log = LoggerFactory.getLogger(CachingFilteringEventSource.class);

protected Cache<ResourceID, T> cache;
protected EventFilter<T> eventFilter;

public CachingFilteringEventSource(Cache<ResourceID, T> cache, EventFilter<T> eventFilter) {
this.cache = cache;
this.eventFilter = eventFilter;
}

protected void handleDelete(ResourceID relatedResourceID) {
if (!isRunning()) {
log.debug("Received event but event for {} source is not running", relatedResourceID);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging doesn't make sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The though was to make is easier on debug level to find out the life cycle of an event. If you prefer I can remove this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the message that's output… I'm not sure what it means :)

return;
}
var cachedValue = cache.get(relatedResourceID);
cache.remove(relatedResourceID);
// we only propagate event if the resource was previously in cache
if (cachedValue != null
&& (eventFilter == null || eventFilter.acceptDelete(cachedValue, relatedResourceID))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you not want to propagate a delete event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Wanted to make a generic filter, but this might be over engineered. We can remove the filters for now and see if someone is asking for it.

eventHandler.handleEvent(new Event(relatedResourceID));
}
}

protected void handleEvent(T value, ResourceID relatedResourceID) {
if (!isRunning()) {
log.debug("Received event but event for {} source is not running", relatedResourceID);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging doesn't make sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same :)

return;
}
var cachedValue = cache.get(relatedResourceID);
if (cachedValue == null || !cachedValue.equals(value)) {
cache.put(relatedResourceID, value);
if (eventFilter == null || eventFilter.accept(value, cachedValue, relatedResourceID)) {
eventHandler.handleEvent(new Event(relatedResourceID));
}
}
}

public Cache<ResourceID, T> getCache() {
return cache;
}

public Optional<T> getCachedValue(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}

@Override
public void stop() throws OperatorException {
super.stop();
cache.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.javaoperatorsdk.operator.processing.event.ResourceID;

public interface EventFilter<T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of event filter actually doesn't really make much sense to me… the events come from event sources which you write, so presumably, the event source decide of which events trigger the event handler so I'm not sure why we need to filter them again…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, the idea was to separate that two thing, maybe to reuse some filters. But agree, I will remove filtering


boolean accept(T newValue, T oldValue, ResourceID relatedResourceID);

default boolean acceptDelete(T value, ResourceID relatedResourceID) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public interface EventSource extends LifecycleAware {

void setEventHandler(EventHandler eventHandler);

/**
* Automatically called when a custom resource is deleted from the cluster.
*
* @param customResourceUid - id of custom resource
*/
default void cleanupForResource(ResourceID customResourceUid) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;

public interface EventSourceRegistry<T extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.javaoperatorsdk.operator.OperatorException;

public abstract class LifecycleAwareEventSource extends AbstractEventSource {

private volatile boolean running = false;

public boolean isRunning() {
return running;
}

@Override
public void start() throws OperatorException {
running = true;
}

@Override
public void stop() throws OperatorException {
running = false;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface ResourceEventAware<T extends HasMetadata> {

default void onResourceCreated(T resource) {}

default void onResourceUpdated(T newResource, T oldResource) {}

default void onResourceDeleted(T resource) {}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.javaoperatorsdk.operator.processing.event.source;
package io.javaoperatorsdk.operator.processing.event.source.controller;

import java.util.Map;
import java.util.Optional;
Expand All @@ -11,7 +11,7 @@
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
import static io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;

public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {

Expand Down
Loading