diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index 8701f31182..55609c2918 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -11,6 +11,35 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; +/** + *

+ * Pols resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but + * instead to calls supplier periodically and independently of the number of state of custom + * resources managed by the operator. It is called on start (synced). This means that when the + * reconciler first time executed on startup a poll already happened before. So if the cache does + * not contain the target resource it means it is not created yet or was deleted while an operator + * was not running. + *

+ *

+ * Another caveat with this is if the cached object is checked in the reconciler and created since + * not in the cache it should be manually added to the cache, since it can happen that the + * reconciler is triggered before the cache is propagated with the new resource from a scheduled + * execution. See {@link PollingEventSource##put(ResourceID, Object)}. + *

+ * So the generic workflow in reconciler should be: + * + * + * + * @param type of the polled resource + * @param

primary resource type + */ public class PollingEventSource extends CachingEventSource { private static final Logger log = LoggerFactory.getLogger(PollingEventSource.class); @@ -29,6 +58,7 @@ public PollingEventSource(Supplier> supplier, @Override public void start() throws OperatorException { super.start(); + getStateAndFillCache(); timer.schedule(new TimerTask() { @Override public void run() { @@ -47,6 +77,10 @@ protected void getStateAndFillCache() { cache.keys().filter(e -> !values.containsKey(e)).forEach(super::handleDelete); } + public void put(ResourceID key, T resource) { + cache.put(key, resource); + } + @Override public void stop() throws OperatorException { super.stop(); @@ -61,15 +95,7 @@ public void stop() throws OperatorException { */ @Override public Optional getAssociated(P primary) { - return getValueFromCacheOrSupplier(ResourceID.fromResource(primary)); + return getCachedValue(ResourceID.fromResource(primary)); } - public Optional getValueFromCacheOrSupplier(ResourceID resourceID) { - var resource = getCachedValue(resourceID); - if (resource.isPresent()) { - return resource; - } - getStateAndFillCache(); - return getCachedValue(resourceID); - } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java index db1ea74aab..d866791147 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java @@ -21,16 +21,18 @@ class PollingEventSourceTest AbstractEventSourceTestBase, EventHandler> { private Supplier> supplier = mock(Supplier.class); + private PollingEventSource pollingEventSource = + new PollingEventSource<>(supplier, 50, SampleExternalResource.class); @BeforeEach public void setup() { - setUpSource(new PollingEventSource<>(supplier, 50, SampleExternalResource.class)); + setUpSource(pollingEventSource, false); } @Test public void pollsAndProcessesEvents() throws InterruptedException { when(supplier.get()).thenReturn(testResponseWithTwoValues()); - + pollingEventSource.start(); Thread.sleep(100); verify(eventHandler, times(2)).handleEvent(any()); @@ -40,7 +42,7 @@ public void pollsAndProcessesEvents() throws InterruptedException { public void propagatesEventForRemovedResources() throws InterruptedException { when(supplier.get()).thenReturn(testResponseWithTwoValues()) .thenReturn(testResponseWithOneValue()); - + pollingEventSource.start(); Thread.sleep(150); verify(eventHandler, times(3)).handleEvent(any()); @@ -49,7 +51,7 @@ public void propagatesEventForRemovedResources() throws InterruptedException { @Test public void doesNotPropagateEventIfResourceNotChanged() throws InterruptedException { when(supplier.get()).thenReturn(testResponseWithTwoValues()); - + pollingEventSource.start(); Thread.sleep(250); verify(eventHandler, times(2)).handleEvent(any());