26
26
public abstract class CachingEventSource <T , P extends HasMetadata >
27
27
extends AbstractResourceEventSource <P , T > implements Cache <T > {
28
28
29
- protected UpdatableCache <T > cache ;
29
+ protected UpdatableCache <Optional < T > > cache ;
30
30
31
31
public CachingEventSource (Class <T > resourceClass ) {
32
32
super (resourceClass );
@@ -35,7 +35,7 @@ public CachingEventSource(Class<T> resourceClass) {
35
35
36
36
@ Override
37
37
public Optional <T > get (ResourceID resourceID ) {
38
- return cache .get (resourceID );
38
+ return cache .get (resourceID ). orElse ( Optional . empty ()) ;
39
39
}
40
40
41
41
@ Override
@@ -50,17 +50,24 @@ public Stream<ResourceID> keys() {
50
50
51
51
@ Override
52
52
public Stream <T > list (Predicate <T > predicate ) {
53
- return cache .list (predicate );
53
+ return cache .list (c -> c . isPresent () && predicate . test ( c . get ())). map ( c -> c . get () );
54
54
}
55
55
56
56
protected void handleDelete (ResourceID relatedResourceID ) {
57
57
if (!isRunning ()) {
58
58
return ;
59
59
}
60
+
61
+
60
62
var cachedValue = cache .get (relatedResourceID );
61
- cache .remove (relatedResourceID );
63
+ cache .put (relatedResourceID , Optional . empty () );
62
64
// we only propagate event if the resource was previously in cache
65
+
63
66
if (cachedValue .isPresent ()) {
67
+ if (cachedValue .get ().isPresent ()) {
68
+ getEventHandler ().handleEvent (new Event (relatedResourceID ));
69
+ }
70
+ } else {
64
71
getEventHandler ().handleEvent (new Event (relatedResourceID ));
65
72
}
66
73
}
@@ -69,19 +76,19 @@ protected void handleEvent(T value, ResourceID relatedResourceID) {
69
76
if (!isRunning ()) {
70
77
return ;
71
78
}
72
- var cachedValue = cache .get (relatedResourceID );
79
+ var cachedValue = cache .get (relatedResourceID ). orElse ( Optional . empty ()) ;
73
80
if (cachedValue .map (v -> !v .equals (value )).orElse (true )) {
74
- cache .put (relatedResourceID , value );
81
+ cache .put (relatedResourceID , Optional . of ( value ) );
75
82
getEventHandler ().handleEvent (new Event (relatedResourceID ));
76
83
}
77
84
}
78
85
79
- protected UpdatableCache <T > initCache () {
86
+ protected UpdatableCache <Optional < T > > initCache () {
80
87
return new MapCache <>();
81
88
}
82
89
83
90
public Optional <T > getCachedValue (ResourceID resourceID ) {
84
- return cache .get (resourceID );
91
+ return cache .get (resourceID ). orElse ( Optional . empty ()) ;
85
92
}
86
93
87
94
@ Override
@@ -91,7 +98,7 @@ public void stop() throws OperatorException {
91
98
92
99
@ Override
93
100
public Optional <T > getAssociated (P primary ) {
94
- return cache .get (ResourceID .fromResource (primary ));
101
+ return cache .get (ResourceID .fromResource (primary )). orElse ( Optional . empty ()) ;
95
102
}
96
103
97
104
protected static class MapCache <T > implements UpdatableCache <T > {
0 commit comments