Skip to content

Commit 33c3e9e

Browse files
sgup432Aman Khare
authored andcommitted
[Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option (opensearch-project#11874)
Signed-off-by: Sagar Upadhyaya <[email protected]> Signed-off-by: Sagar <[email protected]>
1 parent 0b85ec3 commit 33c3e9e

File tree

54 files changed

+3403
-720
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+3403
-720
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9898

9999
## [Unreleased 2.x]
100100
### Added
101+
- [Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option ([#11874](https://github.com/opensearch-project/OpenSearch/pull/11874))
101102
- Add support for dependencies in plugin descriptor properties with semver range ([#11441](https://github.com/opensearch-project/OpenSearch/pull/11441))
102103
- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121))
103104
- Introduce query level setting `index.query.max_nested_depth` limiting nested queries ([#3268](https://github.com/opensearch-project/OpenSearch/issues/3268)

modules/cache-common/build.gradle

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
opensearchplugin {
10+
description 'Module for caches which are optional and do not require additional security permission'
11+
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
12+
}
13+
14+
test {
15+
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
16+
systemProperty 'tests.security.manager', 'false'
17+
}
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cache.common.tier;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.common.cache.CacheType;
13+
import org.opensearch.common.cache.ICache;
14+
import org.opensearch.common.cache.LoadAwareCacheLoader;
15+
import org.opensearch.common.cache.RemovalListener;
16+
import org.opensearch.common.cache.RemovalNotification;
17+
import org.opensearch.common.cache.store.config.CacheConfig;
18+
import org.opensearch.common.settings.Setting;
19+
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.common.util.concurrent.ReleasableLock;
21+
import org.opensearch.common.util.iterable.Iterables;
22+
23+
import java.io.IOException;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.concurrent.locks.ReadWriteLock;
29+
import java.util.concurrent.locks.ReentrantReadWriteLock;
30+
import java.util.function.Function;
31+
32+
/**
33+
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
34+
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
35+
* then items are eventually evicted from it and removed which will result in cache miss.
36+
*
37+
* @param <K> Type of key
38+
* @param <V> Type of value
39+
*
40+
* @opensearch.experimental
41+
*/
42+
@ExperimentalApi
43+
public class TieredSpilloverCache<K, V> implements ICache<K, V> {
44+
45+
private final ICache<K, V> diskCache;
46+
private final ICache<K, V> onHeapCache;
47+
private final RemovalListener<K, V> removalListener;
48+
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
49+
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
50+
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
51+
/**
52+
* Maintains caching tiers in ascending order of cache latency.
53+
*/
54+
private final List<ICache<K, V>> cacheList;
55+
56+
TieredSpilloverCache(Builder<K, V> builder) {
57+
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
58+
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
59+
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");
60+
61+
this.onHeapCache = builder.onHeapCacheFactory.create(
62+
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
63+
@Override
64+
public void onRemoval(RemovalNotification<K, V> notification) {
65+
try (ReleasableLock ignore = writeLock.acquire()) {
66+
diskCache.put(notification.getKey(), notification.getValue());
67+
}
68+
removalListener.onRemoval(notification);
69+
}
70+
})
71+
.setKeyType(builder.cacheConfig.getKeyType())
72+
.setValueType(builder.cacheConfig.getValueType())
73+
.setSettings(builder.cacheConfig.getSettings())
74+
.setWeigher(builder.cacheConfig.getWeigher())
75+
.build(),
76+
builder.cacheType,
77+
builder.cacheFactories
78+
79+
);
80+
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
81+
this.cacheList = Arrays.asList(onHeapCache, diskCache);
82+
}
83+
84+
// Package private for testing
85+
ICache<K, V> getOnHeapCache() {
86+
return onHeapCache;
87+
}
88+
89+
// Package private for testing
90+
ICache<K, V> getDiskCache() {
91+
return diskCache;
92+
}
93+
94+
@Override
95+
public V get(K key) {
96+
return getValueFromTieredCache().apply(key);
97+
}
98+
99+
@Override
100+
public void put(K key, V value) {
101+
try (ReleasableLock ignore = writeLock.acquire()) {
102+
onHeapCache.put(key, value);
103+
}
104+
}
105+
106+
@Override
107+
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
108+
109+
V cacheValue = getValueFromTieredCache().apply(key);
110+
if (cacheValue == null) {
111+
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
112+
// This is needed as there can be many requests for the same key at the same time and we only want to load
113+
// the value once.
114+
V value = null;
115+
try (ReleasableLock ignore = writeLock.acquire()) {
116+
value = onHeapCache.computeIfAbsent(key, loader);
117+
}
118+
return value;
119+
}
120+
return cacheValue;
121+
}
122+
123+
@Override
124+
public void invalidate(K key) {
125+
// We are trying to invalidate the key from all caches though it would be present in only of them.
126+
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
127+
// also trigger a hit/miss listener event, so ignoring it for now.
128+
try (ReleasableLock ignore = writeLock.acquire()) {
129+
for (ICache<K, V> cache : cacheList) {
130+
cache.invalidate(key);
131+
}
132+
}
133+
}
134+
135+
@Override
136+
public void invalidateAll() {
137+
try (ReleasableLock ignore = writeLock.acquire()) {
138+
for (ICache<K, V> cache : cacheList) {
139+
cache.invalidateAll();
140+
}
141+
}
142+
}
143+
144+
/**
145+
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
146+
* @return An iterable over (onHeap + disk) keys
147+
*/
148+
@SuppressWarnings("unchecked")
149+
@Override
150+
public Iterable<K> keys() {
151+
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
152+
}
153+
154+
@Override
155+
public long count() {
156+
long count = 0;
157+
for (ICache<K, V> cache : cacheList) {
158+
count += cache.count();
159+
}
160+
return count;
161+
}
162+
163+
@Override
164+
public void refresh() {
165+
try (ReleasableLock ignore = writeLock.acquire()) {
166+
for (ICache<K, V> cache : cacheList) {
167+
cache.refresh();
168+
}
169+
}
170+
}
171+
172+
@Override
173+
public void close() throws IOException {
174+
for (ICache<K, V> cache : cacheList) {
175+
cache.close();
176+
}
177+
}
178+
179+
private Function<K, V> getValueFromTieredCache() {
180+
return key -> {
181+
try (ReleasableLock ignore = readLock.acquire()) {
182+
for (ICache<K, V> cache : cacheList) {
183+
V value = cache.get(key);
184+
if (value != null) {
185+
// update hit stats
186+
return value;
187+
} else {
188+
// update miss stats
189+
}
190+
}
191+
}
192+
return null;
193+
};
194+
}
195+
196+
/**
197+
* Factory to create TieredSpilloverCache objects.
198+
*/
199+
public static class TieredSpilloverCacheFactory implements ICache.Factory {
200+
201+
/**
202+
* Defines cache name
203+
*/
204+
public static final String TIERED_SPILLOVER_CACHE_NAME = "tiered_spillover";
205+
206+
/**
207+
* Default constructor
208+
*/
209+
public TieredSpilloverCacheFactory() {}
210+
211+
@Override
212+
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
213+
Settings settings = config.getSettings();
214+
Setting<String> onHeapSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
215+
cacheType.getSettingPrefix()
216+
);
217+
String onHeapCacheStoreName = onHeapSetting.get(settings);
218+
if (!cacheFactories.containsKey(onHeapCacheStoreName)) {
219+
throw new IllegalArgumentException(
220+
"No associated onHeapCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
221+
);
222+
}
223+
ICache.Factory onHeapCacheFactory = cacheFactories.get(onHeapCacheStoreName);
224+
225+
Setting<String> onDiskSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
226+
cacheType.getSettingPrefix()
227+
);
228+
String diskCacheStoreName = onDiskSetting.get(settings);
229+
if (!cacheFactories.containsKey(diskCacheStoreName)) {
230+
throw new IllegalArgumentException(
231+
"No associated diskCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
232+
);
233+
}
234+
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
235+
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
236+
.setOnHeapCacheFactory(onHeapCacheFactory)
237+
.setRemovalListener(config.getRemovalListener())
238+
.setCacheConfig(config)
239+
.setCacheType(cacheType)
240+
.build();
241+
}
242+
243+
@Override
244+
public String getCacheName() {
245+
return TIERED_SPILLOVER_CACHE_NAME;
246+
}
247+
}
248+
249+
/**
250+
* Builder object for tiered spillover cache.
251+
* @param <K> Type of key
252+
* @param <V> Type of value
253+
*/
254+
public static class Builder<K, V> {
255+
private ICache.Factory onHeapCacheFactory;
256+
private ICache.Factory diskCacheFactory;
257+
private RemovalListener<K, V> removalListener;
258+
private CacheConfig<K, V> cacheConfig;
259+
private CacheType cacheType;
260+
private Map<String, ICache.Factory> cacheFactories;
261+
262+
/**
263+
* Default constructor
264+
*/
265+
public Builder() {}
266+
267+
/**
268+
* Set onHeap cache factory
269+
* @param onHeapCacheFactory Factory for onHeap cache.
270+
* @return builder
271+
*/
272+
public Builder<K, V> setOnHeapCacheFactory(ICache.Factory onHeapCacheFactory) {
273+
this.onHeapCacheFactory = onHeapCacheFactory;
274+
return this;
275+
}
276+
277+
/**
278+
* Set disk cache factory
279+
* @param diskCacheFactory Factory for disk cache.
280+
* @return builder
281+
*/
282+
public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
283+
this.diskCacheFactory = diskCacheFactory;
284+
return this;
285+
}
286+
287+
/**
288+
* Set removal listener for tiered cache.
289+
* @param removalListener Removal listener
290+
* @return builder
291+
*/
292+
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
293+
this.removalListener = removalListener;
294+
return this;
295+
}
296+
297+
/**
298+
* Set cache config.
299+
* @param cacheConfig cache config.
300+
* @return builder
301+
*/
302+
public Builder<K, V> setCacheConfig(CacheConfig<K, V> cacheConfig) {
303+
this.cacheConfig = cacheConfig;
304+
return this;
305+
}
306+
307+
/**
308+
* Set cache type.
309+
* @param cacheType Cache type
310+
* @return builder
311+
*/
312+
public Builder<K, V> setCacheType(CacheType cacheType) {
313+
this.cacheType = cacheType;
314+
return this;
315+
}
316+
317+
/**
318+
* Set cache factories
319+
* @param cacheFactories cache factories
320+
* @return builder
321+
*/
322+
public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactories) {
323+
this.cacheFactories = cacheFactories;
324+
return this;
325+
}
326+
327+
/**
328+
* Build tiered spillover cache.
329+
* @return TieredSpilloverCache
330+
*/
331+
public TieredSpilloverCache<K, V> build() {
332+
return new TieredSpilloverCache<>(this);
333+
}
334+
}
335+
}

0 commit comments

Comments
 (0)