Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5520c14

Browse files
authoredMay 16, 2025··
improve: status cache for next reconciliation - only the lock version (#2800)
1 parent 937a9a9 commit 5520c14

File tree

16 files changed

+306
-620
lines changed

16 files changed

+306
-620
lines changed
 

‎docs/content/en/docs/documentation/reconciler.md

Lines changed: 17 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,23 @@ From v5, by default, the finalizer is added using Server Side Apply. See also `U
175175
It is typical to want to update the status subresource with the information that is available during the reconciliation.
176176
This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework
177177
does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's
178-
cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets
178+
cache. It can, therefore, happen that, if other events trigger other reconciliations, before the informer cache gets
179179
updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a
180180
problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still
181-
require the latest status version possible, for example if the status subresource is used as a communication mechanism,
182-
see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values)
181+
require the latest status version possible, for example, if the status subresource is used to store allocated values.
182+
See [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values)
183183
from the Kubernetes docs for more details.
184184

185-
The framework provides utilities to help with these use cases with
186-
[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java).
187-
These utility methods come in two flavors:
185+
The framework provides the
186+
[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java) utility class
187+
to help with these use cases.
188188

189-
#### Using internal cache
190-
191-
In almost all cases for this purpose, you can use internal caches:
189+
This class' methods use internal caches in combination with update methods that leveraging
190+
optimistic locking. If the update method fails on optimistic locking, it will retry
191+
using a fresh resource from the server as base for modification.
192192

193193
```java
194-
@Override
194+
@Override
195195
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
196196
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {
197197

@@ -201,85 +201,17 @@ public UpdateControl<StatusPatchCacheCustomResource> reconcile(
201201
var freshCopy = createFreshCopy(primary);
202202
freshCopy.getStatus().setValue(statusWithState());
203203

204-
var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context);
205-
206-
return UpdateControl.noUpdate();
207-
}
208-
```
209-
210-
In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal
211-
cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it
212-
is not necessarily the version of the resource you got as response from the update, it can be newer since other parties
213-
can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status.
214-
215-
See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal).
216-
217-
This approach works with the default configuration of the framework and should be good to go in most of the cases.
218-
Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching`
219-
is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution:
220-
221-
#### Fallback approach: using `PrimaryResourceCache` cache
222-
223-
As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching`
224-
needs to be set to `false` you can use an explicit caching approach:
225-
226-
```java
227-
228-
// We on purpose don't use the provided predicate to show what a custom one could look like.
229-
private final PrimaryResourceCache<StatusPatchPrimaryCacheCustomResource> cache =
230-
new PrimaryResourceCache<>(
231-
(statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) ->
232-
statusPatchCacheCustomResource.getStatus().getValue()
233-
>= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue());
234-
235-
@Override
236-
public UpdateControl<StatusPatchPrimaryCacheCustomResource> reconcile(
237-
StatusPatchPrimaryCacheCustomResource primary,
238-
Context<StatusPatchPrimaryCacheCustomResource> context) {
239-
240-
// cache will compare the current and the cached resource and return the more recent. (And evict the old)
241-
primary = cache.getFreshResource(primary);
242-
243-
// omitted logic
244-
245-
var freshCopy = createFreshCopy(primary);
204+
var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context);
246205

247-
freshCopy.getStatus().setValue(statusWithState());
248-
249-
var updated =
250-
PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache);
251-
252206
return UpdateControl.noUpdate();
253207
}
254-
255-
@Override
256-
public DeleteControl cleanup(
257-
StatusPatchPrimaryCacheCustomResource resource,
258-
Context<StatusPatchPrimaryCacheCustomResource> context)
259-
throws Exception {
260-
// cleanup the cache on resource deletion
261-
cache.cleanup(resource);
262-
return DeleteControl.defaultDelete();
263-
}
264-
265208
```
266209

267-
[`PrimaryResourceCache`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java)
268-
is designed for this purpose. As shown in the example above, it is up to you to provide a predicate to determine if the
269-
resource is more recent than the one available. In other words, when to evict the resource from the cache. Typically, as
270-
shown in
271-
the [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache)
272-
you can have a counter in status to check on that.
273-
274-
Since all of this happens explicitly, you cannot use this approach for managed dependent resources and workflows and
275-
will need to use the unmanaged approach instead. This is due to the fact that managed dependent resources always get
276-
their associated primary resource from the underlying informer event source cache.
277-
278-
#### Additional remarks
210+
After the update `PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource` puts the result of the update into an internal
211+
cache and the framework will make sure that the next reconciliation contains the most recent version of the resource.
212+
Note that it is not necessarily the same version returned as response from the update, it can be a newer version since other parties
213+
can do additional updates meanwhile. However, unless it has been explicitly modified, that
214+
resource will contain the up-to-date status.
279215

280-
As shown in the integration tests, there is no optimistic locking used when updating the
281-
[resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41)
282-
(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on
283-
update.
284216

285-
In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client).
217+
See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache).
Lines changed: 132 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,146 +1,85 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3-
import java.util.function.Supplier;
43
import java.util.function.UnaryOperator;
54

65
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
87

98
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.fabric8.kubernetes.client.KubernetesClientException;
1010
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
1111
import io.fabric8.kubernetes.client.dsl.base.PatchType;
12-
import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache;
12+
import io.javaoperatorsdk.operator.OperatorException;
1313
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1414

1515
/**
1616
* Utility methods to patch the primary resource state and store it to the related cache, to make
17-
* sure that fresh resource is present for the next reconciliation. The main use case for such
18-
* updates is to store state is resource status. Use of optimistic locking is not desired for such
19-
* updates, since we don't want to patch fail and lose information that we want to store.
17+
* sure that the latest version of the resource is present for the next reconciliation. The main use
18+
* case for such updates is to store state is resource status.
19+
*
20+
* <p>The way the framework handles this is with retryable updates with optimistic locking, and
21+
* caches the updated resource from the response in an overlay cache on top of the Informer cache.
22+
* If the update fails, it reads the primary resource from the cluster, applies the modifications
23+
* again and retries the update.
2024
*/
2125
public class PrimaryUpdateAndCacheUtils {
2226

27+
public static final int DEFAULT_MAX_RETRY = 10;
28+
2329
private PrimaryUpdateAndCacheUtils() {}
2430

2531
private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class);
2632

2733
/**
28-
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
29-
* Using update (PUT) method.
30-
*
31-
* @param primary resource
32-
* @param context of reconciliation
33-
* @return updated resource
34-
* @param <P> primary resource type
35-
*/
36-
public static <P extends HasMetadata> P updateAndCacheStatus(P primary, Context<P> context) {
37-
logWarnIfResourceVersionPresent(primary);
38-
return patchAndCacheStatus(
39-
primary, context, () -> context.getClient().resource(primary).updateStatus());
40-
}
41-
42-
/**
43-
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
44-
* Using JSON Merge patch.
45-
*
46-
* @param primary resource
47-
* @param context of reconciliation
48-
* @return updated resource
49-
* @param <P> primary resource type
34+
* Updates the status with optimistic locking and caches the result for next reconciliation. For
35+
* details see {@link #updateAndCacheResource}.
5036
*/
51-
public static <P extends HasMetadata> P patchAndCacheStatus(P primary, Context<P> context) {
52-
logWarnIfResourceVersionPresent(primary);
53-
return patchAndCacheStatus(
54-
primary, context, () -> context.getClient().resource(primary).patchStatus());
37+
public static <P extends HasMetadata> P updateStatusAndCacheResource(
38+
P primary, Context<P> context, UnaryOperator<P> modificationFunction) {
39+
return updateAndCacheResource(
40+
primary,
41+
context,
42+
modificationFunction,
43+
r -> context.getClient().resource(r).updateStatus());
5544
}
5645

5746
/**
58-
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
59-
* Using JSON Patch.
60-
*
61-
* @param primary resource
62-
* @param context of reconciliation
63-
* @return updated resource
64-
* @param <P> primary resource type
47+
* Patches the status using JSON Merge Patch with optimistic locking and caches the result for
48+
* next reconciliation. For details see {@link #updateAndCacheResource}.
6549
*/
66-
public static <P extends HasMetadata> P editAndCacheStatus(
67-
P primary, Context<P> context, UnaryOperator<P> operation) {
68-
logWarnIfResourceVersionPresent(primary);
69-
return patchAndCacheStatus(
70-
primary, context, () -> context.getClient().resource(primary).editStatus(operation));
50+
public static <P extends HasMetadata> P mergePatchStatusAndCacheResource(
51+
P primary, Context<P> context, UnaryOperator<P> modificationFunction) {
52+
return updateAndCacheResource(
53+
primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus());
7154
}
7255

7356
/**
74-
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
75-
*
76-
* @param primary resource
77-
* @param context of reconciliation
78-
* @param patch free implementation of cache
79-
* @return the updated resource.
80-
* @param <P> primary resource type
57+
* Patches the status using JSON Patch with optimistic locking and caches the result for next
58+
* reconciliation. For details see {@link #updateAndCacheResource}.
8159
*/
82-
public static <P extends HasMetadata> P patchAndCacheStatus(
83-
P primary, Context<P> context, Supplier<P> patch) {
84-
var updatedResource = patch.get();
85-
context
86-
.eventSourceRetriever()
87-
.getControllerEventSource()
88-
.handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary);
89-
return updatedResource;
60+
public static <P extends HasMetadata> P patchStatusAndCacheResource(
61+
P primary, Context<P> context, UnaryOperator<P> modificationFunction) {
62+
return updateAndCacheResource(
63+
primary,
64+
context,
65+
UnaryOperator.identity(),
66+
r -> context.getClient().resource(r).editStatus(modificationFunction));
9067
}
9168

9269
/**
93-
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
94-
* Using Server Side Apply.
95-
*
96-
* @param primary resource
97-
* @param freshResourceWithStatus - fresh resource with target state
98-
* @param context of reconciliation
99-
* @return the updated resource.
100-
* @param <P> primary resource type
70+
* Patches the status using Server Side Apply with optimistic locking and caches the result for
71+
* next reconciliation. For details see {@link #updateAndCacheResource}.
10172
*/
102-
public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
73+
public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
10374
P primary, P freshResourceWithStatus, Context<P> context) {
104-
logWarnIfResourceVersionPresent(freshResourceWithStatus);
105-
var res =
106-
context
107-
.getClient()
108-
.resource(freshResourceWithStatus)
109-
.subresource("status")
110-
.patch(
111-
new PatchContext.Builder()
112-
.withForce(true)
113-
.withFieldManager(context.getControllerConfiguration().fieldManager())
114-
.withPatchType(PatchType.SERVER_SIDE_APPLY)
115-
.build());
116-
117-
context
118-
.eventSourceRetriever()
119-
.getControllerEventSource()
120-
.handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary);
121-
return res;
122-
}
123-
124-
/**
125-
* Patches the resource and adds it to the {@link PrimaryResourceCache}.
126-
*
127-
* @param primary resource
128-
* @param freshResourceWithStatus - fresh resource with target state
129-
* @param context of reconciliation
130-
* @param cache - resource cache managed by user
131-
* @return the updated resource.
132-
* @param <P> primary resource type
133-
*/
134-
public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
135-
P primary, P freshResourceWithStatus, Context<P> context, PrimaryResourceCache<P> cache) {
136-
logWarnIfResourceVersionPresent(freshResourceWithStatus);
137-
return patchAndCacheStatus(
75+
return updateAndCacheResource(
13876
primary,
139-
cache,
140-
() ->
77+
context,
78+
r -> freshResourceWithStatus,
79+
r ->
14180
context
14281
.getClient()
143-
.resource(freshResourceWithStatus)
82+
.resource(r)
14483
.subresource("status")
14584
.patch(
14685
new PatchContext.Builder()
@@ -151,75 +90,104 @@ public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
15190
}
15291

15392
/**
154-
* Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}.
155-
*
156-
* @param primary resource
157-
* @param context of reconciliation
158-
* @param cache - resource cache managed by user
159-
* @return the updated resource.
160-
* @param <P> primary resource type
161-
*/
162-
public static <P extends HasMetadata> P editAndCacheStatus(
163-
P primary, Context<P> context, PrimaryResourceCache<P> cache, UnaryOperator<P> operation) {
164-
logWarnIfResourceVersionPresent(primary);
165-
return patchAndCacheStatus(
166-
primary, cache, () -> context.getClient().resource(primary).editStatus(operation));
167-
}
168-
169-
/**
170-
* Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache}
171-
* provided.
93+
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator,
94+
* int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
17295
*
173-
* @param primary resource
96+
* @param resourceToUpdate original resource to update
17497
* @param context of reconciliation
175-
* @param cache - resource cache managed by user
176-
* @return the updated resource.
177-
* @param <P> primary resource type
98+
* @param modificationFunction modifications to make on primary
99+
* @param updateMethod the update method implementation
100+
* @param <P> primary type
101+
* @return the updated resource
178102
*/
179-
public static <P extends HasMetadata> P patchAndCacheStatus(
180-
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
181-
logWarnIfResourceVersionPresent(primary);
182-
return patchAndCacheStatus(
183-
primary, cache, () -> context.getClient().resource(primary).patchStatus());
103+
public static <P extends HasMetadata> P updateAndCacheResource(
104+
P resourceToUpdate,
105+
Context<P> context,
106+
UnaryOperator<P> modificationFunction,
107+
UnaryOperator<P> updateMethod) {
108+
return updateAndCacheResource(
109+
resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY);
184110
}
185111

186112
/**
187-
* Updates the resource and adds it to the {@link PrimaryResourceCache}.
113+
* Modifies the primary using the specified modification function, then uses the modified resource
114+
* for the request to update with provided update method. As the {@code resourceVersion} field of
115+
* the modified resource is set to the value found in the specified resource to update, the update
116+
* operation will therefore use optimistic locking on the server. If the request fails on
117+
* optimistic update, we read the resource again from the K8S API server and retry the whole
118+
* process. In short, we make sure we always update the resource with optimistic locking, then we
119+
* cache the resource in an internal cache. Without further going into details, the optimistic
120+
* locking is needed so we can reliably handle the caching.
188121
*
189-
* @param primary resource
122+
* @param resourceToUpdate original resource to update
190123
* @param context of reconciliation
191-
* @param cache - resource cache managed by user
192-
* @return the updated resource.
193-
* @param <P> primary resource type
194-
*/
195-
public static <P extends HasMetadata> P updateAndCacheStatus(
196-
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
197-
logWarnIfResourceVersionPresent(primary);
198-
return patchAndCacheStatus(
199-
primary, cache, () -> context.getClient().resource(primary).updateStatus());
200-
}
201-
202-
/**
203-
* Updates the resource using the user provided implementation anc caches the result.
204-
*
205-
* @param primary resource
206-
* @param cache resource cache managed by user
207-
* @param patch implementation of resource update*
208-
* @return the updated resource.
209-
* @param <P> primary resource type
124+
* @param modificationFunction modifications to make on primary
125+
* @param updateMethod the update method implementation
126+
* @param maxRetry maximum number of retries before giving up
127+
* @param <P> primary type
128+
* @return the updated resource
210129
*/
211-
public static <P extends HasMetadata> P patchAndCacheStatus(
212-
P primary, PrimaryResourceCache<P> cache, Supplier<P> patch) {
213-
var updatedResource = patch.get();
214-
cache.cacheResource(primary, updatedResource);
215-
return updatedResource;
216-
}
217-
218-
private static <P extends HasMetadata> void logWarnIfResourceVersionPresent(P primary) {
219-
if (primary.getMetadata().getResourceVersion() != null) {
220-
log.warn(
221-
"The metadata.resourceVersion of primary resource is NOT null, "
222-
+ "using optimistic locking is discouraged for this purpose. ");
130+
@SuppressWarnings("unchecked")
131+
public static <P extends HasMetadata> P updateAndCacheResource(
132+
P resourceToUpdate,
133+
Context<P> context,
134+
UnaryOperator<P> modificationFunction,
135+
UnaryOperator<P> updateMethod,
136+
int maxRetry) {
137+
138+
if (log.isDebugEnabled()) {
139+
log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate));
140+
}
141+
P modified = null;
142+
int retryIndex = 0;
143+
while (true) {
144+
try {
145+
modified = modificationFunction.apply(resourceToUpdate);
146+
modified
147+
.getMetadata()
148+
.setResourceVersion(resourceToUpdate.getMetadata().getResourceVersion());
149+
var updated = updateMethod.apply(modified);
150+
context
151+
.eventSourceRetriever()
152+
.getControllerEventSource()
153+
.handleRecentResourceUpdate(
154+
ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate);
155+
return updated;
156+
} catch (KubernetesClientException e) {
157+
log.trace("Exception during patch for resource: {}", resourceToUpdate);
158+
retryIndex++;
159+
// only retry on conflict (409) and unprocessable content (422) which
160+
// can happen if JSON Patch is not a valid request since there was
161+
// a concurrent request which already removed another finalizer:
162+
// List element removal from a list is by index in JSON Patch
163+
// so if addressing a second finalizer but first is meanwhile removed
164+
// it is a wrong request.
165+
if (e.getCode() != 409 && e.getCode() != 422) {
166+
throw e;
167+
}
168+
if (retryIndex > maxRetry) {
169+
log.warn("Retry exhausted, last desired resource: {}", modified);
170+
throw new OperatorException(
171+
"Exceeded maximum ("
172+
+ maxRetry
173+
+ ") retry attempts to patch resource: "
174+
+ ResourceID.fromResource(resourceToUpdate),
175+
e);
176+
}
177+
log.debug(
178+
"Retrying patch for resource name: {}, namespace: {}; HTTP code: {}",
179+
resourceToUpdate.getMetadata().getName(),
180+
resourceToUpdate.getMetadata().getNamespace(),
181+
e.getCode());
182+
resourceToUpdate =
183+
(P)
184+
context
185+
.getClient()
186+
.resources(resourceToUpdate.getClass())
187+
.inNamespace(resourceToUpdate.getMetadata().getNamespace())
188+
.withName(resourceToUpdate.getMetadata().getName())
189+
.get();
190+
}
223191
}
224192
}
225193
}

‎operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java

Lines changed: 0 additions & 65 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.util.function.UnaryOperator;
4+
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Test;
7+
8+
import io.fabric8.kubernetes.client.KubernetesClient;
9+
import io.fabric8.kubernetes.client.KubernetesClientException;
10+
import io.fabric8.kubernetes.client.dsl.MixedOperation;
11+
import io.fabric8.kubernetes.client.dsl.Resource;
12+
import io.javaoperatorsdk.operator.OperatorException;
13+
import io.javaoperatorsdk.operator.TestUtils;
14+
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
15+
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
16+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
17+
18+
import static io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils.DEFAULT_MAX_RETRY;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.junit.jupiter.api.Assertions.assertThrows;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
26+
27+
class PrimaryUpdateAndCacheUtilsTest {
28+
29+
Context<TestCustomResource> context = mock(Context.class);
30+
KubernetesClient client = mock(KubernetesClient.class);
31+
Resource resource = mock(Resource.class);
32+
33+
@BeforeEach
34+
void setup() {
35+
when(context.getClient()).thenReturn(client);
36+
var esr = mock(EventSourceRetriever.class);
37+
when(context.eventSourceRetriever()).thenReturn(esr);
38+
when(esr.getControllerEventSource()).thenReturn(mock(ControllerEventSource.class));
39+
var mixedOp = mock(MixedOperation.class);
40+
when(client.resources(any())).thenReturn(mixedOp);
41+
when(mixedOp.inNamespace(any())).thenReturn(mixedOp);
42+
when(mixedOp.withName(any())).thenReturn(resource);
43+
when(resource.get()).thenReturn(TestUtils.testCustomResource1());
44+
}
45+
46+
@Test
47+
void handlesUpdate() {
48+
var updated =
49+
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
50+
TestUtils.testCustomResource1(),
51+
context,
52+
r -> {
53+
var res = TestUtils.testCustomResource1();
54+
// setting this to null to test if value set in the implementation
55+
res.getMetadata().setResourceVersion(null);
56+
res.getSpec().setValue("updatedValue");
57+
return res;
58+
},
59+
r -> {
60+
// checks if the resource version is set from the original resource
61+
assertThat(r.getMetadata().getResourceVersion()).isEqualTo("1");
62+
var res = TestUtils.testCustomResource1();
63+
res.setSpec(r.getSpec());
64+
res.getMetadata().setResourceVersion("2");
65+
return res;
66+
});
67+
68+
assertThat(updated.getMetadata().getResourceVersion()).isEqualTo("2");
69+
assertThat(updated.getSpec().getValue()).isEqualTo("updatedValue");
70+
}
71+
72+
@Test
73+
void retriesConflicts() {
74+
var updateOperation = mock(UnaryOperator.class);
75+
76+
when(updateOperation.apply(any()))
77+
.thenThrow(new KubernetesClientException("", 409, null))
78+
.thenReturn(TestUtils.testCustomResource1());
79+
80+
var updated =
81+
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
82+
TestUtils.testCustomResource1(),
83+
context,
84+
r -> {
85+
var res = TestUtils.testCustomResource1();
86+
res.getSpec().setValue("updatedValue");
87+
return res;
88+
},
89+
updateOperation);
90+
91+
assertThat(updated).isNotNull();
92+
verify(resource, times(1)).get();
93+
}
94+
95+
@Test
96+
void throwsIfRetryExhausted() {
97+
var updateOperation = mock(UnaryOperator.class);
98+
99+
when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
100+
101+
assertThrows(
102+
OperatorException.class,
103+
() ->
104+
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
105+
TestUtils.testCustomResource1(),
106+
context,
107+
UnaryOperator.identity(),
108+
updateOperation));
109+
verify(resource, times(DEFAULT_MAX_RETRY)).get();
110+
}
111+
}

‎operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java

Lines changed: 0 additions & 87 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache;
1+
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

33
import io.fabric8.kubernetes.api.model.Namespaced;
44
import io.fabric8.kubernetes.client.CustomResource;
@@ -8,7 +8,7 @@
88

99
@Group("sample.javaoperatorsdk")
1010
@Version("v1")
11-
@ShortNames("spc")
12-
public class StatusPatchPrimaryCacheCustomResource
13-
extends CustomResource<StatusPatchPrimaryCacheSpec, StatusPatchPrimaryCacheStatus>
11+
@ShortNames("spwl")
12+
public class StatusPatchCacheWithLockCustomResource
13+
extends CustomResource<StatusPatchCacheWithLockSpec, StatusPatchCacheWithLockStatus>
1414
implements Namespaced {}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.baseapi.statuscache.internal;
1+
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

33
import java.time.Duration;
44

@@ -11,19 +11,19 @@
1111
import static org.assertj.core.api.Assertions.assertThat;
1212
import static org.awaitility.Awaitility.await;
1313

14-
public class StatusPatchCacheIT {
14+
public class StatusPatchCacheWithLockIT {
1515

1616
public static final String TEST_1 = "test1";
1717

1818
@RegisterExtension
1919
LocallyRunOperatorExtension extension =
2020
LocallyRunOperatorExtension.builder()
21-
.withReconciler(StatusPatchCacheReconciler.class)
21+
.withReconciler(StatusPatchCacheWithLockReconciler.class)
2222
.build();
2323

2424
@Test
2525
void testStatusAlwaysUpToDate() {
26-
var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class);
26+
var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class);
2727

2828
extension.create(testResource());
2929

@@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() {
3939
});
4040
}
4141

42-
StatusPatchCacheCustomResource testResource() {
43-
var res = new StatusPatchCacheCustomResource();
42+
StatusPatchCacheWithLockCustomResource testResource() {
43+
var res = new StatusPatchCacheWithLockCustomResource();
4444
res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build());
45-
res.setSpec(new StatusPatchCacheSpec());
45+
res.setSpec(new StatusPatchCacheWithLockSpec());
4646
return res;
4747
}
4848
}
Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.baseapi.statuscache.internal;
1+
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

33
import java.util.List;
44

@@ -9,18 +9,19 @@
99
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
1010
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
1111
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
12-
import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource;
1312
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1413

1514
@ControllerConfiguration
16-
public class StatusPatchCacheReconciler implements Reconciler<StatusPatchCacheCustomResource> {
15+
public class StatusPatchCacheWithLockReconciler
16+
implements Reconciler<StatusPatchCacheWithLockCustomResource> {
1717

1818
public volatile int latestValue = 0;
1919
public volatile boolean errorPresent = false;
2020

2121
@Override
22-
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
23-
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {
22+
public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
23+
StatusPatchCacheWithLockCustomResource resource,
24+
Context<StatusPatchCacheWithLockCustomResource> context) {
2425

2526
if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) {
2627
errorPresent = true;
@@ -31,33 +32,39 @@ public UpdateControl<StatusPatchCacheCustomResource> reconcile(
3132
+ resource.getStatus().getValue());
3233
}
3334

35+
// test also resource update happening meanwhile reconciliation
36+
resource.getSpec().setCounter(resource.getSpec().getCounter() + 1);
37+
context.getClient().resource(resource).update();
38+
3439
var freshCopy = createFreshCopy(resource);
3540

3641
freshCopy
3742
.getStatus()
3843
.setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1);
3944

40-
var updated = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context);
45+
var updated =
46+
PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context);
4147
latestValue = updated.getStatus().getValue();
4248

4349
return UpdateControl.noUpdate();
4450
}
4551

4652
@Override
47-
public List<EventSource<?, StatusPatchCacheCustomResource>> prepareEventSources(
48-
EventSourceContext<StatusPatchCacheCustomResource> context) {
53+
public List<EventSource<?, StatusPatchCacheWithLockCustomResource>> prepareEventSources(
54+
EventSourceContext<StatusPatchCacheWithLockCustomResource> context) {
4955
// periodic event triggering for testing purposes
5056
return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache()));
5157
}
5258

53-
private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) {
54-
var res = new StatusPatchCacheCustomResource();
59+
private StatusPatchCacheWithLockCustomResource createFreshCopy(
60+
StatusPatchCacheWithLockCustomResource resource) {
61+
var res = new StatusPatchCacheWithLockCustomResource();
5562
res.setMetadata(
5663
new ObjectMetaBuilder()
5764
.withName(resource.getMetadata().getName())
5865
.withNamespace(resource.getMetadata().getNamespace())
5966
.build());
60-
res.setStatus(new StatusPatchCacheStatus());
67+
res.setStatus(new StatusPatchCacheWithLockStatus());
6168

6269
return res;
6370
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package io.javaoperatorsdk.operator.baseapi.statuscache.internal;
1+
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

3-
public class StatusPatchCacheSpec {
3+
public class StatusPatchCacheWithLockSpec {
44

55
private int counter = 0;
66

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.baseapi.statuscache;
2+
3+
public class StatusPatchCacheWithLockStatus {
4+
5+
private Integer value = 0;
6+
7+
public Integer getValue() {
8+
return value;
9+
}
10+
11+
public StatusPatchCacheWithLockStatus setValue(Integer value) {
12+
this.value = value;
13+
return this;
14+
}
15+
}

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

‎operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)
Please sign in to comment.