Skip to content

Commit 9629820

Browse files
committed
feat: use a predicate to select the custom resource for which a reconcile should be triggered #430
1 parent c17663b commit 9629820

File tree

8 files changed

+226
-4
lines changed

8 files changed

+226
-4
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import com.fasterxml.jackson.core.JsonProcessingException;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import io.fabric8.kubernetes.client.CustomResource;
9+
import java.util.List;
910
import java.util.Optional;
11+
import java.util.Set;
1012
import java.util.concurrent.ConcurrentHashMap;
1113
import java.util.concurrent.ConcurrentMap;
1214
import java.util.concurrent.locks.Lock;
1315
import java.util.concurrent.locks.ReentrantLock;
1416
import java.util.function.Predicate;
17+
import java.util.stream.Collectors;
1518
import org.slf4j.Logger;
1619
import org.slf4j.LoggerFactory;
1720

@@ -64,6 +67,30 @@ public Optional<CustomResource> getLatestResource(String uuid) {
6467
return Optional.ofNullable(resources.get(uuid)).map(this::clone);
6568
}
6669

70+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
71+
try {
72+
lock.lock();
73+
return resources.values().stream()
74+
.filter(selector)
75+
.map(this::clone)
76+
.collect(Collectors.toList());
77+
} finally {
78+
lock.unlock();
79+
}
80+
}
81+
82+
public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) {
83+
try {
84+
lock.lock();
85+
return resources.values().stream()
86+
.filter(selector)
87+
.map(r -> r.getMetadata().getUid())
88+
.collect(Collectors.toSet());
89+
} finally {
90+
lock.unlock();
91+
}
92+
}
93+
6794
private CustomResource clone(CustomResource customResource) {
6895
try {
6996
return objectMapper.readValue(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import java.util.HashMap;
2020
import java.util.HashSet;
2121
import java.util.Map;
22+
import java.util.Objects;
2223
import java.util.Optional;
2324
import java.util.Set;
2425
import java.util.concurrent.ScheduledThreadPoolExecutor;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.locks.ReentrantLock;
28+
import java.util.function.Predicate;
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931

@@ -108,8 +110,19 @@ public void handleEvent(Event event) {
108110
try {
109111
lock.lock();
110112
log.debug("Received event: {}", event);
111-
eventBuffer.addEvent(event);
112-
executeBufferedEvents(event.getRelatedCustomResourceUid());
113+
114+
Predicate<CustomResource> selector = event.getCustomResourcesSelector();
115+
if (selector == null) {
116+
final String uid =
117+
Objects.requireNonNull(event.getRelatedCustomResourceUid(), "CustomResource UID");
118+
119+
selector = customResource -> Objects.equals(uid, customResource.getMetadata().getUid());
120+
}
121+
122+
for (String uid : eventSourceManager.getLatestResourceUids(selector)) {
123+
eventBuffer.addEvent(uid, event);
124+
executeBufferedEvents(uid);
125+
}
113126
} finally {
114127
lock.unlock();
115128
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,22 @@
66
import java.util.HashMap;
77
import java.util.List;
88
import java.util.Map;
9+
import java.util.Objects;
910

1011
class EventBuffer {
1112

1213
private final Map<String, List<Event>> events = new HashMap<>();
1314

15+
/** @deprecated use {@link #addEvent(String, Event)} */
16+
@Deprecated
1417
public void addEvent(Event event) {
15-
String uid = event.getRelatedCustomResourceUid();
18+
addEvent(event.getRelatedCustomResourceUid(), event);
19+
}
20+
21+
public void addEvent(String uid, Event event) {
22+
Objects.requireNonNull(uid, "uid");
23+
Objects.requireNonNull(event, "event");
24+
1625
List<Event> crEvents = events.computeIfAbsent(uid, (id) -> new ArrayList<>(1));
1726
crEvents.add(event);
1827
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
6+
@SuppressWarnings("rawtypes")
37
public abstract class AbstractEvent implements Event {
48

59
private final String relatedCustomResourceUid;
6-
10+
private final Predicate<CustomResource> customResourcesSelector;
711
private final EventSource eventSource;
812

913
public AbstractEvent(String relatedCustomResourceUid, EventSource eventSource) {
1014
this.relatedCustomResourceUid = relatedCustomResourceUid;
15+
this.customResourcesSelector = null;
16+
this.eventSource = eventSource;
17+
}
18+
19+
public AbstractEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource) {
20+
this.relatedCustomResourceUid = null;
21+
this.customResourcesSelector = customResourcesSelector;
1122
this.eventSource = eventSource;
1223
}
1324

@@ -16,6 +27,10 @@ public String getRelatedCustomResourceUid() {
1627
return relatedCustomResourceUid;
1728
}
1829

30+
public Predicate<CustomResource> getCustomResourcesSelector() {
31+
return customResourcesSelector;
32+
}
33+
1934
@Override
2035
public EventSource getEventSource() {
2136
return eventSource;
@@ -27,6 +42,8 @@ public String toString() {
2742
+ this.getClass().getName()
2843
+ ", relatedCustomResourceUid="
2944
+ relatedCustomResourceUid
45+
+ ", customResourcesSelector="
46+
+ customResourcesSelector
3047
+ ", eventSource="
3148
+ eventSource
3249
+ " }";

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1313
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
1414
import java.util.Collections;
15+
import java.util.List;
1516
import java.util.Map;
1617
import java.util.Objects;
1718
import java.util.Optional;
19+
import java.util.Set;
1820
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.concurrent.locks.ReentrantLock;
2022
import java.util.function.Predicate;
@@ -163,6 +165,16 @@ public Optional<CustomResource> getLatestResource(String customResourceUid) {
163165
return getCache().getLatestResource(customResourceUid);
164166
}
165167

168+
// todo: remove
169+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
170+
return getCache().getLatestResources(selector);
171+
}
172+
173+
// todo: remove
174+
public Set<String> getLatestResourceUids(Predicate<CustomResource> selector) {
175+
return getCache().getLatestResourcesUids(selector);
176+
}
177+
166178
// todo: remove
167179
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
168180
getCache().cacheResource(resource, predicate);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
36
public interface Event {
47

8+
/**
9+
* @return the UID of the the {@link CustomResource} for which a reconcile loop should be
10+
* triggered.
11+
* @deprecated use {@link #getCustomResourcesSelector()}
12+
*/
13+
@Deprecated
514
String getRelatedCustomResourceUid();
615

16+
/**
17+
* The selector used to determine the {@link CustomResource} for which a reconcile loop should be
18+
* triggered.
19+
*/
20+
Predicate<CustomResource> getCustomResourcesSelector();
21+
22+
/** @return the {@link EventSource} that has generated the event. */
723
EventSource getEventSource();
824
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package io.javaoperatorsdk.operator.processing;
2+
3+
import static io.javaoperatorsdk.operator.TestUtils.testCustomResource;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.mockito.Mockito.any;
6+
import static org.mockito.Mockito.doAnswer;
7+
import static org.mockito.Mockito.doCallRealMethod;
8+
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.timeout;
10+
import static org.mockito.Mockito.verify;
11+
import static org.mockito.Mockito.when;
12+
13+
import io.fabric8.kubernetes.client.Watcher;
14+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
15+
import io.javaoperatorsdk.operator.processing.event.AbstractEvent;
16+
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
17+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
18+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
19+
import java.util.Objects;
20+
import java.util.UUID;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.mockito.ArgumentCaptor;
24+
25+
class CustomResourceSelectorTest {
26+
27+
public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250;
28+
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
29+
30+
private final EventDispatcher eventDispatcherMock = mock(EventDispatcher.class);
31+
private final CustomResourceCache customResourceCache = new CustomResourceCache();
32+
33+
private final DefaultEventSourceManager defaultEventSourceManagerMock =
34+
mock(DefaultEventSourceManager.class);
35+
36+
private final DefaultEventHandler defaultEventHandler =
37+
new DefaultEventHandler(
38+
eventDispatcherMock,
39+
"Test",
40+
null,
41+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
42+
43+
@BeforeEach
44+
public void setup() {
45+
defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock);
46+
47+
// todo: remove
48+
when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache);
49+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
50+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any());
51+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any());
52+
doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any());
53+
doAnswer(
54+
invocation -> {
55+
final var resourceId = (String) invocation.getArgument(0);
56+
customResourceCache.cleanup(resourceId);
57+
return null;
58+
})
59+
.when(defaultEventSourceManagerMock)
60+
.cleanup(any());
61+
}
62+
63+
@Test
64+
public void dispatchEventsWithPredicate() {
65+
TestCustomResource cr1 = testCustomResource(UUID.randomUUID().toString());
66+
cr1.getSpec().setValue("1");
67+
TestCustomResource cr2 = testCustomResource(UUID.randomUUID().toString());
68+
cr2.getSpec().setValue("2");
69+
TestCustomResource cr3 = testCustomResource(UUID.randomUUID().toString());
70+
cr3.getSpec().setValue("3");
71+
72+
customResourceCache.cacheResource(cr1);
73+
customResourceCache.cacheResource(cr2);
74+
customResourceCache.cacheResource(cr3);
75+
76+
defaultEventHandler.handleEvent(
77+
new AbstractEvent(
78+
c -> {
79+
var tcr = ((TestCustomResource) c);
80+
return Objects.equals("1", tcr.getSpec().getValue())
81+
|| Objects.equals("3", tcr.getSpec().getValue());
82+
},
83+
null) {});
84+
85+
verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2))
86+
.handleExecution(any());
87+
88+
waitMinimalTime();
89+
90+
ArgumentCaptor<ExecutionScope> executionScopeArgumentCaptor =
91+
ArgumentCaptor.forClass(ExecutionScope.class);
92+
93+
verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2))
94+
.handleExecution(executionScopeArgumentCaptor.capture());
95+
96+
assertThat(executionScopeArgumentCaptor.getAllValues())
97+
.hasSize(2)
98+
.allSatisfy(
99+
s -> {
100+
assertThat(s.getEvents()).isNotEmpty().hasOnlyElementsOfType(AbstractEvent.class);
101+
assertThat(s)
102+
.satisfiesAnyOf(
103+
e -> Objects.equals(cr1.getMetadata().getUid(), e.getCustomResourceUid()),
104+
e -> Objects.equals(cr3.getMetadata().getUid(), e.getCustomResourceUid()));
105+
});
106+
}
107+
108+
private void waitMinimalTime() {
109+
try {
110+
Thread.sleep(1000);
111+
} catch (InterruptedException e) {
112+
throw new IllegalStateException(e);
113+
}
114+
}
115+
116+
private CustomResourceEvent prepareCREvent() {
117+
return prepareCREvent(UUID.randomUUID().toString());
118+
}
119+
120+
private CustomResourceEvent prepareCREvent(String uid) {
121+
TestCustomResource customResource = testCustomResource(uid);
122+
customResourceCache.cacheResource(customResource);
123+
return new CustomResourceEvent(Watcher.Action.MODIFIED, customResource, null);
124+
}
125+
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public void setup() {
6969
// todo: remove
7070
when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache);
7171
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
72+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
73+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any());
74+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any());
7275
doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any());
7376
doAnswer(
7477
invocation -> {

0 commit comments

Comments
 (0)