Skip to content

feat: add sample to primary to secondary missing issue #1847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void readsSecondaryInManyToOneCases() throws InterruptedException {
.isEqualTo(1));
}

Job job() {
public static Job job() {
var job = new Job();
job.setMetadata(new ObjectMetaBuilder()
.withName("job1")
Expand All @@ -48,7 +48,7 @@ Job job() {
return job;
}

Cluster cluster() {
public static Cluster cluster() {
Cluster cluster = new Cluster();
cluster.setMetadata(new ObjectMetaBuilder()
.withName(CLUSTER_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.primarytosecondary.Cluster;
import io.javaoperatorsdk.operator.sample.primarytosecondary.JobReconciler;

import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.cluster;
import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.job;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* The intention with this IT is to show the use cases why the PrimaryToSecondary Mapper is needed,
* and the situation when it is not working.
*/
class PrimaryToSecondaryMissingIT {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some description that this unit test is to showcase the role of PtoS mapper and that it it not meant to be used as reusable code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will add, thx!!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withAdditionalCustomResourceDefinition(Cluster.class)
.withReconciler(new JobReconciler(false))
.build();

@Test
void missingPrimaryToSecondaryCausesIssueAccessingSecondary() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isTrue();
assertThat(reconciler.getNumberOfExecutions()).isZero();
});
}

@Test
void accessingDirectlyTheCacheWorksWithoutPToSMapper() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
reconciler.setGetResourceDirectlyFromCache(true);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isFalse();
assertThat(reconciler.getNumberOfExecutions()).isPositive();
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,49 @@
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;

/**
* This reconciler used in integration tests to show the cases when PrimaryToSecondaryMapper is
* needed, and to show the use cases when some mechanisms would not work without that. It's not
* intended to be a reusable code as it is, rather serves for deeper understanding of the problem.
*/
@ControllerConfiguration()
public class JobReconciler
implements Reconciler<Job>, EventSourceInitializer<Job> {
implements Reconciler<Job>, EventSourceInitializer<Job>, ErrorStatusHandler<Job> {

private static final String JOB_CLUSTER_INDEX = "job-cluster-index";

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

private final boolean addPrimaryToSecondaryMapper;
private boolean getResourceDirectlyFromCache = false;
private volatile boolean errorOccurred;

public JobReconciler() {
this(true);
}

public JobReconciler(boolean addPrimaryToSecondaryMapper) {
this.addPrimaryToSecondaryMapper = addPrimaryToSecondaryMapper;
}

@Override
public UpdateControl<Job> reconcile(
Job resource, Context<Job> context) {

context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
if (!getResourceDirectlyFromCache) {
// this is only possible when there is primary to secondary mapper
context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
} else {
// reading the resource from cache as alternative, works without primary to secondary mapper
var informerEventSource = (InformerEventSource<Cluster, Job>) context.eventSourceRetriever()
.getResourceEventSourceFor(Cluster.class);
informerEventSource
.get(new ResourceID(resource.getSpec().getClusterName(),
resource.getMetadata().getNamespace()))
.orElseThrow(
() -> new IllegalStateException("Secondary resource cannot be read from cache"));
}
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}
Expand All @@ -36,20 +65,22 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<Job> cont
context.getPrimaryCache().addIndexer(JOB_CLUSTER_INDEX, (job -> List
.of(indexKey(job.getSpec().getClusterName(), job.getMetadata().getNamespace()))));

InformerConfiguration<Cluster> informerConfiguration =
InformerConfiguration.InformerConfigurationBuilder<Cluster> informerConfiguration =
InformerConfiguration.from(Cluster.class, context)
.withSecondaryToPrimaryMapper(cluster -> context.getPrimaryCache()
.byIndex(JOB_CLUSTER_INDEX, indexKey(cluster.getMetadata().getName(),
cluster.getMetadata().getNamespace()))
.stream().map(ResourceID::fromResource).collect(Collectors.toSet()))
.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())))
.withNamespacesInheritedFromController(context)
.build();
.withNamespacesInheritedFromController(context);

if (addPrimaryToSecondaryMapper) {
informerConfiguration = informerConfiguration.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())));
}

return EventSourceInitializer
.nameEventSources(new InformerEventSource<>(informerConfiguration, context));
.nameEventSources(new InformerEventSource<>(informerConfiguration.build(), context));
}

private String indexKey(String clusterName, String namespace) {
Expand All @@ -59,4 +90,20 @@ private String indexKey(String clusterName, String namespace) {
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

@Override
public ErrorStatusUpdateControl<Job> updateErrorStatus(Job resource, Context<Job> context,
Exception e) {
errorOccurred = true;
return ErrorStatusUpdateControl.noStatusUpdate();
}

public boolean isErrorOccurred() {
return errorOccurred;
}

public JobReconciler setGetResourceDirectlyFromCache(boolean getResourceDirectlyFromCache) {
this.getResourceDirectlyFromCache = getResourceDirectlyFromCache;
return this;
}
}