Skip to content

fix: polling event source improvements #901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,35 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;

/**
* <p>
* Pols resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Pols resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but
* Polls resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but

* instead to calls supplier periodically and independently of the number of state of custom
* resources managed by the operator. It is called on start (synced). This means that when the
* reconciler first time executed on startup a poll already happened before. So if the cache does
* not contain the target resource it means it is not created yet or was deleted while an operator
* was not running.
* </p>
* <p>
* Another caveat with this is if the cached object is checked in the reconciler and created since
* not in the cache it should be manually added to the cache, since it can happen that the
* reconciler is triggered before the cache is propagated with the new resource from a scheduled
* execution. See {@link PollingEventSource##put(ResourceID, Object)}.
* </p>
* So the generic workflow in reconciler should be:
*
* <ul>
* <li>Check if the cache contains the resource.</li>
* <li>If cache contains the resource reconcile it - compare with target state, update if necessary
* </li>
* <li>if cache not contains the resource create it.</li>
* <li>If the resource was created or updated, put the new version of the resource manually to the
* cache.</li>
* </ul>
*
* @param <T> type of the polled resource
* @param <P> primary resource type
*/
public class PollingEventSource<T, P extends HasMetadata> extends CachingEventSource<T, P> {

private static final Logger log = LoggerFactory.getLogger(PollingEventSource.class);
Expand All @@ -29,6 +58,7 @@ public PollingEventSource(Supplier<Map<ResourceID, T>> supplier,
@Override
public void start() throws OperatorException {
super.start();
getStateAndFillCache();
timer.schedule(new TimerTask() {
@Override
public void run() {
Expand All @@ -47,6 +77,10 @@ protected void getStateAndFillCache() {
cache.keys().filter(e -> !values.containsKey(e)).forEach(super::handleDelete);
}

public void put(ResourceID key, T resource) {
cache.put(key, resource);
}

@Override
public void stop() throws OperatorException {
super.stop();
Expand All @@ -61,15 +95,7 @@ public void stop() throws OperatorException {
*/
@Override
public Optional<T> getAssociated(P primary) {
return getValueFromCacheOrSupplier(ResourceID.fromResource(primary));
return getCachedValue(ResourceID.fromResource(primary));
}

public Optional<T> getValueFromCacheOrSupplier(ResourceID resourceID) {
var resource = getCachedValue(resourceID);
if (resource.isPresent()) {
return resource;
}
getStateAndFillCache();
return getCachedValue(resourceID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ class PollingEventSourceTest
AbstractEventSourceTestBase<PollingEventSource<SampleExternalResource, HasMetadata>, EventHandler> {

private Supplier<Map<ResourceID, SampleExternalResource>> supplier = mock(Supplier.class);
private PollingEventSource<SampleExternalResource, HasMetadata> pollingEventSource =
new PollingEventSource<>(supplier, 50, SampleExternalResource.class);

@BeforeEach
public void setup() {
setUpSource(new PollingEventSource<>(supplier, 50, SampleExternalResource.class));
setUpSource(pollingEventSource, false);
}

@Test
public void pollsAndProcessesEvents() throws InterruptedException {
when(supplier.get()).thenReturn(testResponseWithTwoValues());

pollingEventSource.start();
Thread.sleep(100);

verify(eventHandler, times(2)).handleEvent(any());
Expand All @@ -40,7 +42,7 @@ public void pollsAndProcessesEvents() throws InterruptedException {
public void propagatesEventForRemovedResources() throws InterruptedException {
when(supplier.get()).thenReturn(testResponseWithTwoValues())
.thenReturn(testResponseWithOneValue());

pollingEventSource.start();
Thread.sleep(150);

verify(eventHandler, times(3)).handleEvent(any());
Expand All @@ -49,7 +51,7 @@ public void propagatesEventForRemovedResources() throws InterruptedException {
@Test
public void doesNotPropagateEventIfResourceNotChanged() throws InterruptedException {
when(supplier.get()).thenReturn(testResponseWithTwoValues());

pollingEventSource.start();
Thread.sleep(250);

verify(eventHandler, times(2)).handleEvent(any());
Expand Down