Skip to content

Commit 844323f

Browse files
authored
Temporal resource cache in Event Source (#965)
1 parent ae4ad56 commit 844323f

28 files changed

+1304
-237
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ protected DefaultInformerConfiguration(ConfigurationService service, String labe
3737
Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource);
3838
}
3939

40+
4041
public PrimaryResourcesRetriever<R> getPrimaryResourcesRetriever() {
4142
return secondaryToPrimaryResourcesIdSet;
4243
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public void start() throws OperatorException {
214214
}
215215

216216
final var context = new EventSourceContext<>(
217-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
217+
eventSourceManager.getControllerResourceEventSource(),
218218
configurationService(), kubernetesClient);
219219

220220
prepareEventSources(context).forEach(eventSourceManager::registerEventSource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,4 @@
3232
* @return the label selector
3333
*/
3434
String labelSelector() default EMPTY_STRING;
35-
3635
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,30 +75,46 @@ private void configureWith(ConfigurationService configService, String labelSelec
7575
.withPrimaryResourcesRetriever(primaryResourcesRetriever)
7676
.withAssociatedSecondaryResourceIdentifier(secondaryResourceIdentifier)
7777
.build();
78-
configureWith(configService, new InformerEventSource<>(ic, client), addOwnerReference);
78+
configureWith(new InformerEventSource<>(ic, client), addOwnerReference);
7979
}
8080

8181
/**
8282
* Use to share informers between event more resources.
83-
*
84-
* @param configurationService get configs
83+
*
8584
* @param informerEventSource informer to use
8685
* @param addOwnerReference to the created resource
8786
*/
88-
public void configureWith(ConfigurationService configurationService,
87+
public void configureWith(
8988
InformerEventSource<R, P> informerEventSource,
9089
boolean addOwnerReference) {
9190
this.informerEventSource = informerEventSource;
9291
this.addOwnerReference = addOwnerReference;
9392
}
9493

9594
public void create(R target, P primary, Context context) {
96-
prepare(target, primary, "Creating").create(target);
95+
var resourceID = ResourceID.fromResource(target);
96+
try {
97+
informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID);
98+
var created = prepare(target, primary, "Creating").create(target);
99+
informerEventSource.handleRecentResourceCreate(created);
100+
} catch (RuntimeException e) {
101+
informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID);
102+
throw e;
103+
}
97104
}
98105

99106
public void update(R actual, R target, P primary, Context context) {
100-
var updatedActual = processor.replaceSpecOnActual(actual, target, context);
101-
prepare(target, primary, "Updating").replace(updatedActual);
107+
var resourceID = ResourceID.fromResource(target);
108+
try {
109+
var updatedActual = processor.replaceSpecOnActual(actual, target, context);
110+
informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID);
111+
var updated = prepare(target, primary, "Updating").replace(updatedActual);
112+
informerEventSource.handleRecentResourceUpdate(updated,
113+
actual.getMetadata().getResourceVersion());
114+
} catch (RuntimeException e) {
115+
informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID);
116+
throw e;
117+
}
102118
}
103119

104120
public boolean match(R actualResource, R desiredResource, Context context) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
3030

3131
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
32-
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
3332

3433
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {
3534

@@ -50,7 +49,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
5049

5150
EventProcessor(EventSourceManager<R> eventSourceManager) {
5251
this(
53-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
52+
eventSourceManager.getControllerResourceEventSource(),
5453
ExecutorServiceManager.instance().executorService(),
5554
eventSourceManager.getController().getConfiguration().getName(),
5655
new ReconciliationDispatcher<>(eventSourceManager.getController()),
@@ -73,7 +72,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
7372
Retry retry,
7473
Metrics metrics) {
7574
this(
76-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
75+
eventSourceManager.getControllerResourceEventSource(),
7776
null,
7877
relatedControllerName,
7978
reconciliationDispatcher,
@@ -208,12 +207,12 @@ void eventProcessingFinished(
208207
if (eventMarker.deleteEventPresent(resourceID)) {
209208
cleanupForDeletedEvent(executionScope.getCustomResourceID());
210209
} else {
210+
postExecutionControl.getUpdatedCustomResource().ifPresent(r -> {
211+
eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate(r,
212+
executionScope.getResource().getMetadata().getResourceVersion());
213+
});
211214
if (eventMarker.eventPresent(resourceID)) {
212-
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
213-
submitReconciliationExecution(resourceID);
214-
} else {
215-
postponeReconciliationAndHandleCacheSyncEvent(resourceID);
216-
}
215+
submitReconciliationExecution(resourceID);
217216
} else {
218217
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
219218
}
@@ -223,41 +222,6 @@ void eventProcessingFinished(
223222
}
224223
}
225224

226-
private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) {
227-
eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID);
228-
}
229-
230-
private boolean isCacheReadyForInstantReconciliation(
231-
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
232-
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
233-
return true;
234-
}
235-
String originalResourceVersion = getVersion(executionScope.getResource());
236-
String customResourceVersionAfterExecution =
237-
getVersion(
238-
postExecutionControl
239-
.getUpdatedCustomResource()
240-
.orElseThrow(
241-
() -> new IllegalStateException(
242-
"Updated custom resource must be present at this point of time")));
243-
String cachedCustomResourceVersion =
244-
getVersion(
245-
cache
246-
.get(executionScope.getCustomResourceID())
247-
.orElseThrow(
248-
() -> new IllegalStateException(
249-
"Cached custom resource must be present at this point")));
250-
251-
if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
252-
return true;
253-
}
254-
// If the cached resource version equals neither the version before nor after execution
255-
// probably an update happened on the custom resource independent of the framework during
256-
// reconciliation. We cannot tell at this point if it happened before our update or before.
257-
// (Well we could if we would parse resource version, but that should not be done by definition)
258-
return !cachedCustomResourceVersion.equals(originalResourceVersion);
259-
}
260-
261225
private void reScheduleExecutionIfInstructed(
262226
PostExecutionControl<R> postExecutionControl, R customResource) {
263227
postExecutionControl

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.javaoperatorsdk.operator.processing.Controller;
1414
import io.javaoperatorsdk.operator.processing.MDCUtils;
1515
import io.javaoperatorsdk.operator.processing.event.ResourceID;
16-
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
1716
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
1817

1918
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
@@ -25,30 +24,19 @@ public class ControllerResourceEventSource<T extends HasMetadata>
2524
implements ResourceEventHandler<T> {
2625

2726
public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
28-
2927
private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class);
3028

3129
private final Controller<T> controller;
3230
private final ResourceEventFilter<T> filter;
33-
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
3431

3532
public ControllerResourceEventSource(Controller<T> controller) {
3633
super(controller.getCRClient(), controller.getConfiguration());
3734
this.controller = controller;
38-
3935
var filters = new ResourceEventFilter[] {
4036
ResourceEventFilters.finalizerNeededAndApplied(),
4137
ResourceEventFilters.markedForDeletion(),
4238
ResourceEventFilters.generationAware(),
43-
null
4439
};
45-
46-
if (controller.getConfiguration().isGenerationAware()) {
47-
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
48-
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
49-
} else {
50-
onceWhitelistEventFilterEventFilter = null;
51-
}
5240
if (controller.getConfiguration().getEventFilter() != null) {
5341
filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters));
5442
} else {
@@ -87,36 +75,22 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) {
8775

8876
@Override
8977
public void onAdd(T resource) {
78+
super.onAdd(resource);
9079
eventReceived(ResourceAction.ADDED, resource, null);
9180
}
9281

9382
@Override
9483
public void onUpdate(T oldCustomResource, T newCustomResource) {
84+
super.onUpdate(oldCustomResource, newCustomResource);
9585
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource);
9686
}
9787

9888
@Override
9989
public void onDelete(T resource, boolean b) {
90+
super.onDelete(resource, b);
10091
eventReceived(ResourceAction.DELETED, resource, null);
10192
}
10293

103-
public ResourceCache<T> getResourceCache() {
104-
return manager();
105-
}
106-
107-
/**
108-
* This will ensure that the next event received after this method is called will not be filtered
109-
* out.
110-
*
111-
* @param resourceID - to which the event is related
112-
*/
113-
public void whitelistNextEvent(ResourceID resourceID) {
114-
if (onceWhitelistEventFilterEventFilter != null) {
115-
onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID);
116-
}
117-
}
118-
119-
12094
private void handleKubernetesClientException(Exception e) {
12195
KubernetesClientException ke = (KubernetesClientException) e;
12296
if (404 == ke.getCode()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.informer;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
10+
11+
public class EventRecorder<R extends HasMetadata> {
12+
13+
private final Map<ResourceID, ArrayList<R>> resourceEvents = new ConcurrentHashMap<>();
14+
15+
void startEventRecording(ResourceID resourceID) {
16+
resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5));
17+
}
18+
19+
public boolean isRecordingFor(ResourceID resourceID) {
20+
return resourceEvents.get(resourceID) != null;
21+
}
22+
23+
public void stopEventRecording(ResourceID resourceID) {
24+
resourceEvents.remove(resourceID);
25+
}
26+
27+
public void recordEvent(R resource) {
28+
resourceEvents.get(ResourceID.fromResource(resource)).add(resource);
29+
}
30+
31+
public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) {
32+
List<R> events = resourceEvents.get(resourceID);
33+
if (events == null) {
34+
return false;
35+
}
36+
if (events.isEmpty()) {
37+
return false;
38+
} else {
39+
return events.stream()
40+
.anyMatch(e -> e.getMetadata().getResourceVersion().equals(resourceVersion));
41+
}
42+
}
43+
44+
public boolean containsEventWithVersionButItsNotLastOne(
45+
ResourceID resourceID, String resourceVersion) {
46+
List<R> resources = resourceEvents.get(resourceID);
47+
if (resources == null) {
48+
throw new IllegalStateException(
49+
"Null events list, this is probably a result of invalid usage of the " +
50+
"InformerEventSource. Resource ID: " + resourceID);
51+
}
52+
if (resources.isEmpty()) {
53+
throw new IllegalStateException("No events for resource id: " + resourceID);
54+
}
55+
return !resources
56+
.get(resources.size() - 1)
57+
.getMetadata()
58+
.getResourceVersion()
59+
.equals(resourceVersion);
60+
}
61+
62+
public R getLastEvent(ResourceID resourceID) {
63+
List<R> resources = resourceEvents.get(resourceID);
64+
if (resources == null) {
65+
throw new IllegalStateException(
66+
"Null events list, this is probably a result of invalid usage of the " +
67+
"InformerEventSource. Resource ID: " + resourceID);
68+
}
69+
return resources.get(resources.size() - 1);
70+
}
71+
}

0 commit comments

Comments
 (0)