Skip to content

Commit 22f03fe

Browse files
authored
fix: resource cache interface for InformerEventSource (#758)
1 parent 4611134 commit 22f03fe

File tree

12 files changed

+42
-137
lines changed

12 files changed

+42
-137
lines changed

operator-framework-core/pom.xml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,5 @@
105105
<artifactId>awaitility</artifactId>
106106
<scope>test</scope>
107107
</dependency>
108-
<dependency>
109-
<groupId>javax.cache</groupId>
110-
<artifactId>cache-api</artifactId>
111-
<version>${jcache.version}</version>
112-
</dependency>
113-
<dependency>
114-
<groupId>com.github.ben-manes.caffeine</groupId>
115-
<artifactId>caffeine</artifactId>
116-
<scope>test</scope>
117-
</dependency>
118-
<dependency>
119-
<groupId>com.github.ben-manes.caffeine</groupId>
120-
<artifactId>jcache</artifactId>
121-
<scope>test</scope>
122-
</dependency>
123108
</dependencies>
124109
</project>

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package io.javaoperatorsdk.operator.processing.event.source;
22

3+
import java.util.Collections;
4+
import java.util.Map;
35
import java.util.Optional;
4-
5-
import javax.cache.Cache;
6-
7-
import org.slf4j.Logger;
8-
import org.slf4j.LoggerFactory;
6+
import java.util.concurrent.ConcurrentHashMap;
97

108
import io.javaoperatorsdk.operator.OperatorException;
119
import io.javaoperatorsdk.operator.processing.event.Event;
@@ -25,13 +23,9 @@
2523
*/
2624
public abstract class CachingEventSource<T> extends LifecycleAwareEventSource {
2725

28-
private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class);
26+
protected Map<ResourceID, T> cache = new ConcurrentHashMap<>();
2927

30-
protected Cache<ResourceID, T> cache;
31-
32-
public CachingEventSource(Cache<ResourceID, T> cache) {
33-
this.cache = cache;
34-
}
28+
public CachingEventSource() {}
3529

3630
protected void handleDelete(ResourceID relatedResourceID) {
3731
if (!isRunning()) {
@@ -56,8 +50,8 @@ protected void handleEvent(T value, ResourceID relatedResourceID) {
5650
}
5751
}
5852

59-
public Cache<ResourceID, T> getCache() {
60-
return cache;
53+
public Map<ResourceID, T> getCache() {
54+
return Collections.unmodifiableMap(cache);
6155
}
6256

6357
public Optional<T> getCachedValue(ResourceID resourceID) {
@@ -67,6 +61,5 @@ public Optional<T> getCachedValue(ResourceID resourceID) {
6761
@Override
6862
public void stop() throws OperatorException {
6963
super.stop();
70-
cache.close();
7164
}
7265
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event.source.inbound;
22

3-
import javax.cache.Cache;
4-
53
import io.javaoperatorsdk.operator.processing.event.ResourceID;
64
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
75

86
public class CachingInboundEventSource<T> extends CachingEventSource<T> {
97

10-
public CachingInboundEventSource(Cache<ResourceID, T> cache) {
11-
super(cache);
12-
}
13-
148
public void handleResourceEvent(T resource, ResourceID relatedResourceID) {
159
super.handleEvent(resource, relatedResourceID);
1610
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

33
import java.util.Objects;
4+
import java.util.Optional;
45
import java.util.Set;
56
import java.util.function.Function;
7+
import java.util.function.Predicate;
8+
import java.util.stream.Stream;
69

710
import org.slf4j.Logger;
811
import org.slf4j.LoggerFactory;
@@ -16,8 +19,10 @@
1619
import io.javaoperatorsdk.operator.processing.event.Event;
1720
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1821
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource;
22+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
1923

20-
public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource {
24+
public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource
25+
implements ResourceCache<T> {
2126

2227
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
2328

@@ -132,4 +137,22 @@ public T getAssociated(HasMetadata resource) {
132137
public SharedInformer<T> getSharedInformer() {
133138
return sharedInformer;
134139
}
140+
141+
@Override
142+
public Optional<T> get(ResourceID resourceID) {
143+
return Optional.ofNullable(sharedInformer.getStore()
144+
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
145+
resourceID.getName())));
146+
}
147+
148+
@Override
149+
public Stream<T> list(Predicate<T> predicate) {
150+
return sharedInformer.getStore().list().stream().filter(predicate);
151+
}
152+
153+
@Override
154+
public Stream<T> list(String namespace, Predicate<T> predicate) {
155+
return sharedInformer.getStore().list().stream()
156+
.filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v));
157+
}
135158
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.concurrent.ConcurrentHashMap;
88
import java.util.function.Predicate;
99

10-
import javax.cache.Cache;
11-
1210
import org.slf4j.Logger;
1311
import org.slf4j.LoggerFactory;
1412

@@ -44,14 +42,13 @@ public class PerResourcePollingEventSource<T, R extends HasMetadata>
4442
private final long period;
4543

4644
public PerResourcePollingEventSource(ResourceSupplier<T, R> resourceSupplier,
47-
ResourceCache<R> resourceCache, long period, Cache<ResourceID, T> cache) {
48-
this(resourceSupplier, resourceCache, period, cache, null);
45+
ResourceCache<R> resourceCache, long period) {
46+
this(resourceSupplier, resourceCache, period, null);
4947
}
5048

5149
public PerResourcePollingEventSource(ResourceSupplier<T, R> resourceSupplier,
52-
ResourceCache<R> resourceCache, long period, Cache<ResourceID, T> cache,
50+
ResourceCache<R> resourceCache, long period,
5351
Predicate<R> registerPredicate) {
54-
super(cache);
5552
this.resourceSupplier = resourceSupplier;
5653
this.resourceCache = resourceCache;
5754
this.period = period;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
import java.util.*;
44
import java.util.function.Supplier;
5-
import java.util.stream.StreamSupport;
6-
7-
import javax.cache.Cache;
85

96
import org.slf4j.Logger;
107
import org.slf4j.LoggerFactory;
@@ -22,8 +19,7 @@ public class PollingEventSource<T> extends CachingEventSource<T> {
2219
private final long period;
2320

2421
public PollingEventSource(Supplier<Map<ResourceID, T>> supplier,
25-
long period, Cache<ResourceID, T> cache) {
26-
super(cache);
22+
long period) {
2723
this.supplierToPoll = supplier;
2824
this.period = period;
2925
}
@@ -46,8 +42,8 @@ public void run() {
4642
protected void getStateAndFillCache() {
4743
var values = supplierToPoll.get();
4844
values.forEach((k, v) -> super.handleEvent(v, k));
49-
StreamSupport.stream(cache.spliterator(), false)
50-
.filter(e -> !values.containsKey(e.getKey())).map(Cache.Entry::getKey)
45+
cache.keySet().stream()
46+
.filter(e -> !values.containsKey(e))
5147
.forEach(super::handleDelete);
5248
}
5349

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSourceTest.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event.source;
22

3-
import javax.cache.Cache;
4-
import javax.cache.CacheManager;
5-
import javax.cache.configuration.MutableConfiguration;
6-
import javax.cache.spi.CachingProvider;
7-
83
import org.junit.jupiter.api.AfterEach;
94
import org.junit.jupiter.api.BeforeEach;
105
import org.junit.jupiter.api.Test;
116

127
import io.javaoperatorsdk.operator.processing.event.Event;
138
import io.javaoperatorsdk.operator.processing.event.EventHandler;
14-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
15-
16-
import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;
179

1810
import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*;
1911
import static org.assertj.core.api.Assertions.assertThat;
@@ -22,16 +14,11 @@
2214
class CachingEventSourceTest {
2315

2416
private CachingEventSource<SampleExternalResource> cachingEventSource;
25-
private Cache<ResourceID, SampleExternalResource> cache;
2617
private EventHandler eventHandlerMock = mock(EventHandler.class);
2718

2819
@BeforeEach
2920
public void setup() {
30-
CachingProvider cachingProvider = new CaffeineCachingProvider();
31-
CacheManager cacheManager = cachingProvider.getCacheManager();
32-
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());
33-
34-
cachingEventSource = new SimpleCachingEventSource(cache);
21+
cachingEventSource = new SimpleCachingEventSource();
3522
cachingEventSource.setEventHandler(eventHandlerMock);
3623
cachingEventSource.start();
3724
}
@@ -89,9 +76,6 @@ public void noEventOnDeleteIfResourceWasNotInCacheBefore() {
8976

9077
public static class SimpleCachingEventSource
9178
extends CachingEventSource<SampleExternalResource> {
92-
public SimpleCachingEventSource(Cache<ResourceID, SampleExternalResource> cache) {
93-
super(cache);
94-
}
9579
}
9680

9781
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22

33
import java.util.Optional;
44

5-
import javax.cache.Cache;
6-
import javax.cache.CacheManager;
7-
import javax.cache.configuration.MutableConfiguration;
8-
import javax.cache.spi.CachingProvider;
9-
105
import org.junit.jupiter.api.BeforeEach;
116
import org.junit.jupiter.api.Test;
127

@@ -17,10 +12,7 @@
1712
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
1813
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
1914

20-
import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;
21-
2215
import static org.assertj.core.api.Assertions.assertThat;
23-
import static org.junit.jupiter.api.Assertions.*;
2416
import static org.mockito.ArgumentMatchers.any;
2517
import static org.mockito.ArgumentMatchers.eq;
2618
import static org.mockito.Mockito.*;
@@ -32,22 +24,17 @@ class PerResourcePollingEventSourceTest {
3224
private PerResourcePollingEventSource.ResourceSupplier<SampleExternalResource, TestCustomResource> supplier =
3325
mock(PerResourcePollingEventSource.ResourceSupplier.class);
3426
private ResourceCache<TestCustomResource> resourceCache = mock(ResourceCache.class);
35-
private Cache<ResourceID, SampleExternalResource> cache;
3627
private EventHandler eventHandler = mock(EventHandler.class);
3728
private TestCustomResource testCustomResource = TestUtils.testCustomResource();
3829

3930
@BeforeEach
4031
public void setup() {
41-
CachingProvider cachingProvider = new CaffeineCachingProvider();
42-
CacheManager cacheManager = cachingProvider.getCacheManager();
43-
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());
44-
4532
when(resourceCache.get(any())).thenReturn(Optional.of(testCustomResource));
4633
when(supplier.getResources(any()))
4734
.thenReturn(Optional.of(SampleExternalResource.testResource1()));
4835

4936
pollingEventSource =
50-
new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache);
37+
new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD);
5138
pollingEventSource.setEventHandler(eventHandler);
5239
}
5340

@@ -63,7 +50,7 @@ public void pollsTheResourceAfterAwareOfIt() throws InterruptedException {
6350

6451
@Test
6552
public void registeringTaskOnAPredicate() throws InterruptedException {
66-
pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache,
53+
pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
6754
testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1);
6855
pollingEventSource.setEventHandler(eventHandler);
6956
pollingEventSource.start();

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@
44
import java.util.Map;
55
import java.util.function.Supplier;
66

7-
import javax.cache.Cache;
8-
import javax.cache.CacheManager;
9-
import javax.cache.configuration.MutableConfiguration;
10-
import javax.cache.spi.CachingProvider;
11-
127
import org.junit.jupiter.api.AfterEach;
138
import org.junit.jupiter.api.BeforeEach;
149
import org.junit.jupiter.api.Test;
@@ -17,25 +12,18 @@
1712
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1813
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
1914

20-
import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;
21-
2215
import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*;
2316
import static org.mockito.Mockito.*;
2417

2518
class PollingEventSourceTest {
2619

2720
private PollingEventSource<SampleExternalResource> pollingEventSource;
2821
private Supplier<Map<ResourceID, SampleExternalResource>> supplier = mock(Supplier.class);
29-
private Cache<ResourceID, SampleExternalResource> cache;
3022
private EventHandler eventHandler = mock(EventHandler.class);
3123

3224
@BeforeEach
3325
public void setup() {
34-
CachingProvider cachingProvider = new CaffeineCachingProvider();
35-
CacheManager cacheManager = cachingProvider.getCacheManager();
36-
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());
37-
38-
pollingEventSource = new PollingEventSource<>(supplier, 50, cache);
26+
pollingEventSource = new PollingEventSource<>(supplier, 50);
3927
pollingEventSource.setEventHandler(eventHandler);
4028
}
4129

pom.xml

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@
7070
<formatter-maven-plugin.version>2.17.1</formatter-maven-plugin.version>
7171
<directory-maven-plugin.version>1.0</directory-maven-plugin.version>
7272
<impsort-maven-plugin.version>1.6.2</impsort-maven-plugin.version>
73-
<jcache.version>1.1.1</jcache.version>
74-
<caffein.version>3.0.4</caffein.version>
7573
</properties>
7674

7775
<modules>
@@ -170,21 +168,6 @@
170168
<artifactId>operator-framework</artifactId>
171169
<version>${project.version}</version>
172170
</dependency>
173-
<dependency>
174-
<groupId>javax.cache</groupId>
175-
<artifactId>cache-api</artifactId>
176-
<version>${jcache.version}</version>
177-
</dependency>
178-
<dependency>
179-
<groupId>com.github.ben-manes.caffeine</groupId>
180-
<artifactId>caffeine</artifactId>
181-
<version>${caffein.version}</version>
182-
</dependency>
183-
<dependency>
184-
<groupId>com.github.ben-manes.caffeine</groupId>
185-
<artifactId>jcache</artifactId>
186-
<version>${caffein.version}</version>
187-
</dependency>
188171
</dependencies>
189172
</dependencyManagement>
190173

sample-operators/mysql-schema/pom.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,6 @@
6767
<artifactId>jackson-dataformat-yaml</artifactId>
6868
<version>2.13.0</version>
6969
</dependency>
70-
<dependency>
71-
<groupId>javax.cache</groupId>
72-
<artifactId>cache-api</artifactId>
73-
<version>${jcache.version}</version>
74-
</dependency>
75-
<dependency>
76-
<groupId>com.github.ben-manes.caffeine</groupId>
77-
<artifactId>caffeine</artifactId>
78-
</dependency>
79-
<dependency>
80-
<groupId>com.github.ben-manes.caffeine</groupId>
81-
<artifactId>jcache</artifactId>
82-
</dependency>
8370
</dependencies>
8471

8572
<build>

0 commit comments

Comments
 (0)