Skip to content

Event filter fix #1129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -6,6 +6,6 @@ public interface RecentOperationEventFilter<R> extends RecentOperationCacheFille

void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource);

void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, R resource);
void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID);

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.javaoperatorsdk.operator.processing.dependent;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
@@ -14,7 +12,6 @@ public abstract class AbstractEventSourceHolderDependentResource<R, P extends Ha
extends AbstractDependentResource<R, P>
implements EventSourceProvider<P> {
private T eventSource;
private boolean isFilteringEventSource;
private boolean isCacheFillerEventSource;

public EventSource initEventSource(EventSourceContext<P> context) {
@@ -25,9 +22,6 @@ public EventSource initEventSource(EventSourceContext<P> context) {
eventSource = createEventSource(context);
}

// but we still need to record which interfaces the event source implements even if it has
// already been set before this method is called
isFilteringEventSource = eventSource instanceof RecentOperationEventFilter;
isCacheFillerEventSource = eventSource instanceof RecentOperationCacheFiller;
return eventSource;
}
@@ -42,33 +36,6 @@ protected T eventSource() {
return eventSource;
}

protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R created = null;
try {
prepareEventFiltering(desired, resourceID);
created = super.handleCreate(desired, primary, context);
return created;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, created);
throw e;
}
}

protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R updated = null;
try {
prepareEventFiltering(desired, resourceID);
updated = super.handleUpdate(actual, desired, primary, context);
return updated;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, updated);
throw e;
}
}


protected void onCreated(ResourceID primaryResourceId, R created) {
if (isCacheFillerEventSource) {
recentOperationCacheFiller().handleRecentResourceCreate(primaryResourceId, created);
@@ -81,22 +48,6 @@ protected void onUpdated(ResourceID primaryResourceId, R updated, R actual) {
}
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
if (isFilteringEventSource) {
recentOperationEventFilter().prepareForCreateOrUpdateEventFiltering(resourceID, desired);
}
}

private void cleanupAfterEventFiltering(R desired, ResourceID resourceID, R created) {
if (isFilteringEventSource) {
recentOperationEventFilter().cleanupOnCreateOrUpdateEventFiltering(resourceID, created);
}
}

@SuppressWarnings("unchecked")
private RecentOperationEventFilter<R> recentOperationEventFilter() {
return (RecentOperationEventFilter<R>) eventSource;
}

@SuppressWarnings("unchecked")
private RecentOperationCacheFiller<R> recentOperationCacheFiller() {
Original file line number Diff line number Diff line change
@@ -81,6 +81,33 @@ public void configureWith(InformerEventSource<R, P> informerEventSource) {
setEventSource(informerEventSource);
}


protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(desired);
R created = null;
try {
prepareEventFiltering(desired, resourceID);
created = super.handleCreate(desired, primary, context);
return created;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(resourceID);
throw e;
}
}

protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(desired);
R updated = null;
try {
prepareEventFiltering(desired, resourceID);
updated = super.handleUpdate(actual, desired, primary, context);
return updated;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(resourceID);
throw e;
}
}

@SuppressWarnings("unused")
public R create(R target, P primary, Context<P> context) {
return prepare(target, primary, "Creating").create(target);
@@ -152,4 +179,13 @@ public KubernetesClient getKubernetesClient() {
protected R desired(P primary, Context<P> context) {
return super.desired(primary, context);
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
eventSource().prepareForCreateOrUpdateEventFiltering(resourceID, desired);
}

private void cleanupAfterEventFiltering(ResourceID resourceID) {
eventSource().cleanupOnCreateOrUpdateEventFiltering(resourceID);
}

}
Original file line number Diff line number Diff line change
@@ -231,11 +231,10 @@ public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resou
* Mean to be called to clean up in case of an exception from the client. Usually in a catch
* block.
*
* @param resource handled by the informer
* @param resourceID to cleanup
*/
@Override
public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID,
R resource) {
public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID) {
log.debug("Stopping event recording for: {}", resourceID);
eventRecorder.stopEventRecording(resourceID);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.junit.OperatorExtension;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.ConfigMapDependentResource;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResource;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResourceSpec;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResourceTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OperationEventFilterIT {

public static final String TEST = "test";
public static final String SPEC_VAL_1 = "val1";
public static final String SPEC_VAL_2 = "val2";

@RegisterExtension
OperatorExtension operator =
OperatorExtension.builder()
.withReconciler(new OperationEventFilterCustomResourceTestReconciler())
.build();

@Test
void reconcileNotTriggeredWithDependentResourceCreateOrUpdate() {
var resource = operator.create(OperationEventFilterCustomResource.class, createTestResource());

await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
.until(
() -> ((OperationEventFilterCustomResourceTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() == 1);
assertThat(operator.get(ConfigMap.class, TEST).getData())
.containsEntry(ConfigMapDependentResource.KEY, SPEC_VAL_1);

resource.getSpec().setValue(SPEC_VAL_2);
operator.replace(OperationEventFilterCustomResource.class, resource);

await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
.until(
() -> ((OperationEventFilterCustomResourceTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() == 2);
assertThat(operator.get(ConfigMap.class, TEST).getData())
.containsEntry(ConfigMapDependentResource.KEY, SPEC_VAL_2);
}


private OperationEventFilterCustomResource createTestResource() {
OperationEventFilterCustomResource cr = new OperationEventFilterCustomResource();
cr.setMetadata(new ObjectMeta());
cr.getMetadata().setName(TEST);
cr.setSpec(new OperationEventFilterCustomResourceSpec());
cr.getSpec().setValue(SPEC_VAL_1);
return cr;
}

}
Original file line number Diff line number Diff line change
@@ -28,14 +28,12 @@ void managedDependentsAreReconciledInOrder() {

await().atMost(Duration.ofSeconds(5))
.until(() -> ((OrderedManagedDependentTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() >= 1);
// todo change to more precise values when event filtering is fixed
// assertThat(OrderedManagedDependentTestReconciler.dependentExecution).hasSize(4);
.getNumberOfExecutions() == 1);

assertThat(OrderedManagedDependentTestReconciler.dependentExecution.get(0))
.isEqualTo(ConfigMapDependentResource1.class);
assertThat(OrderedManagedDependentTestReconciler.dependentExecution.get(1))
.isEqualTo(ConfigMapDependentResource2.class);

}


Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ public UpdateControl<CreateUpdateEventFilterTestCustomResource> reconcile(
informerEventSource.handleRecentResourceCreate(resourceID, configMap);
} catch (RuntimeException e) {
informerEventSource
.cleanupOnCreateOrUpdateEventFiltering(resourceID, configMapToCreate);
.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
} else {
@@ -76,7 +76,7 @@ public UpdateControl<CreateUpdateEventFilterTestCustomResource> reconcile(
newConfigMap, configMap);
} catch (RuntimeException e) {
informerEventSource
.cleanupOnCreateOrUpdateEventFiltering(resourceID, configMap);
.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import java.util.HashMap;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUKubernetesDependentResource;

public class ConfigMapDependentResource extends
CRUKubernetesDependentResource<ConfigMap, OperationEventFilterCustomResource> {

public static final String KEY = "key1";

public ConfigMapDependentResource() {
super(ConfigMap.class);
}

@Override
protected ConfigMap desired(OperationEventFilterCustomResource primary,
Context<OperationEventFilterCustomResource> context) {

ConfigMap configMap = new ConfigMap();
configMap.setMetadata(new ObjectMeta());
configMap.getMetadata().setName(primary.getMetadata().getName());
configMap.getMetadata().setNamespace(primary.getMetadata().getNamespace());
HashMap<String, String> data = new HashMap<>();
data.put(KEY, primary.getSpec().getValue());
configMap.setData(data);
return configMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Kind;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@Kind("OperationEventFilterCustomResource")
@ShortNames("oef")
public class OperationEventFilterCustomResource
extends CustomResource<OperationEventFilterCustomResourceSpec, String>
implements Namespaced {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

public class OperationEventFilterCustomResourceSpec {

private String value;

public String getValue() {
return value;
}

public OperationEventFilterCustomResourceSpec setValue(String value) {
this.value = value;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import java.util.concurrent.atomic.AtomicInteger;

import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;

@ControllerConfiguration(
namespaces = Constants.WATCH_CURRENT_NAMESPACE,
dependents = {
@Dependent(type = ConfigMapDependentResource.class),
})
public class OperationEventFilterCustomResourceTestReconciler
implements Reconciler<OperationEventFilterCustomResource>,
TestExecutionInfoProvider {

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

@Override
public UpdateControl<OperationEventFilterCustomResource> reconcile(
OperationEventFilterCustomResource resource,
Context<OperationEventFilterCustomResource> context) {
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}

public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

}