Skip to content

Commit 33d8fce

Browse files
authored
Initialize FileCache for warm index on node boot-up/restart (#18467)
Signed-off-by: Sandeep Kumawat <[email protected]>
1 parent 8ed095a commit 33d8fce

File tree

18 files changed

+678
-94
lines changed

18 files changed

+678
-94
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java

Lines changed: 164 additions & 30 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,20 +189,10 @@ public void testFullFileAndFileCacheStats() throws ExecutionException, Interrupt
189189
Settings settings = Settings.builder()
190190
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
191191
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
192-
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
193192
.build();
194193

195194
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());
196195

197-
// Verify from the cluster settings if the data locality is partial
198-
GetIndexResponse getIndexResponse = client().admin()
199-
.indices()
200-
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
201-
.get();
202-
203-
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
204-
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
205-
206196
// Ingesting docs again before force merge
207197
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
208198
flushAndRefresh(INDEX_NAME_2);

server/src/main/java/org/opensearch/common/cache/RemovalReason.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum RemovalReason {
2121
INVALIDATED,
2222
EVICTED,
2323
EXPLICIT,
24-
CAPACITY
24+
CAPACITY,
25+
RESTARTED // This is used by testing framework to close the CachedIndexInput during node restart.
2526
}

server/src/main/java/org/opensearch/env/NodeEnvironment.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ public NodeEnvironment(Settings settings, Environment environment, IndexStoreLis
389389
}
390390

391391
if (DiscoveryNode.isWarmNode(settings) == false) {
392-
ensureNoFileCacheData(fileCacheNodePath);
392+
ensureNoFileCacheData(fileCacheNodePath, settings);
393393
}
394394

395395
this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
@@ -1204,8 +1204,8 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
12041204
/**
12051205
* Throws an exception if cache exists on a non-warm node.
12061206
*/
1207-
private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException {
1208-
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
1207+
private void ensureNoFileCacheData(final NodePath fileCacheNodePath, final Settings settings) throws IOException {
1208+
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath, settings);
12091209
if (cacheDataPaths.isEmpty() == false) {
12101210
final String message = String.format(
12111211
Locale.ROOT,
@@ -1278,12 +1278,27 @@ private static boolean isIndexMetadataPath(Path path) {
12781278
* Collect the path containing cache data in the indicated cache node path.
12791279
* The returned paths will point to the shard data folder.
12801280
*/
1281-
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
1281+
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath, Settings settings) throws IOException {
12821282
// Structure is: <file cache path>/<index uuid>/<shard id>/...
12831283
List<Path> indexSubPaths = new ArrayList<>();
1284-
Path fileCachePath = fileCacheNodePath.fileCachePath;
1285-
if (Files.isDirectory(fileCachePath)) {
1286-
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(fileCachePath)) {
1284+
// Process file cache path
1285+
processDirectory(fileCacheNodePath.fileCachePath, indexSubPaths);
1286+
if (DiscoveryNode.isDedicatedWarmNode(settings)) {
1287+
// Process <indices>/... path only for warm nodes.
1288+
processDirectory(fileCacheNodePath.indicesPath, indexSubPaths);
1289+
}
1290+
1291+
return indexSubPaths;
1292+
}
1293+
1294+
@Deprecated(forRemoval = true)
1295+
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
1296+
return collectFileCacheDataPath(fileCacheNodePath, Settings.EMPTY);
1297+
}
1298+
1299+
private static void processDirectory(Path directoryPath, List<Path> indexSubPaths) throws IOException {
1300+
if (Files.isDirectory(directoryPath)) {
1301+
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(directoryPath)) {
12871302
for (Path indexPath : indexStream) {
12881303
if (Files.isDirectory(indexPath)) {
12891304
try (Stream<Path> shardStream = Files.list(indexPath)) {
@@ -1293,8 +1308,6 @@ public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) th
12931308
}
12941309
}
12951310
}
1296-
1297-
return indexSubPaths;
12981311
}
12991312

13001313
/**

server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private void processNoClusterManagerRepurposeNode(
137137

138138
if (repurposeSearch) {
139139
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
140-
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
140+
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings());
141141
fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths);
142142
}
143143

@@ -227,7 +227,7 @@ private void processClusterManagerRepurposeNode(
227227

228228
if (repurposeSearch) {
229229
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
230-
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
230+
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings());
231231
fileCachePaths = uniqueParentPaths(fileCacheDataPaths);
232232
}
233233

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput;
2626
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
2727
import org.opensearch.index.store.remote.filecache.FileCache;
28+
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
2829
import org.opensearch.index.store.remote.utils.FileTypeUtils;
2930
import org.opensearch.index.store.remote.utils.TransferManager;
3031

3132
import java.io.FileNotFoundException;
3233
import java.io.IOException;
34+
import java.nio.file.Files;
3335
import java.nio.file.NoSuchFileException;
3436
import java.nio.file.Path;
3537
import java.util.Arrays;
@@ -285,7 +287,27 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
285287
}
286288
// Return directly from the FileCache (via TransferManager) if complete file is present
287289
Path key = getFilePath(name);
288-
CachedIndexInput indexInput = fileCache.get(key);
290+
291+
CachedIndexInput indexInput = fileCache.compute(key, (path, cachedIndexInput) -> {
292+
// If entry exists and is not closed, use it
293+
if (cachedIndexInput != null && cachedIndexInput.isClosed() == false) {
294+
return cachedIndexInput;
295+
}
296+
297+
// If entry is closed but file exists locally, create new IndexInput from local
298+
if (cachedIndexInput != null && cachedIndexInput.isClosed() && Files.exists(key)) {
299+
try {
300+
assert cachedIndexInput instanceof RestoredCachedIndexInput;
301+
return new CachedFullFileIndexInput(fileCache, key, localDirectory.openInput(name, IOContext.DEFAULT));
302+
} catch (IOException e) {
303+
throw new RuntimeException(e);
304+
}
305+
}
306+
307+
// Return null to fall back to remote store block download/existing block reuse.
308+
return null;
309+
});
310+
289311
if (indexInput != null) {
290312
logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name);
291313
try {
@@ -306,7 +328,8 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
306328
if (uploadedSegmentMetadata == null) {
307329
throw new NoSuchFileException("File " + name + " not found in directory");
308330
}
309-
// TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific
331+
// TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot
332+
// specific
310333
BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
311334
name,
312335
new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST),
@@ -332,7 +355,6 @@ public void close() throws IOException {
332355
fileCache.remove(getFilePath(localFile));
333356
}
334357
}
335-
fileCache.prune();
336358
localDirectory.close();
337359
}
338360

server/src/main/java/org/opensearch/index/store/remote/filecache/AggregateFileCacheStats.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public long getCacheMisses() {
116116
return overallFileCacheStats.getCacheMisses();
117117
}
118118

119+
// visible for testing.
120+
public FileCacheStats getBlockFileCacheStats() {
121+
return blockFileCacheStats;
122+
}
123+
119124
@Override
120125
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
121126
builder.startObject(Fields.AGGREGATE_FILE_CACHE);

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.util.List;
2929
import java.util.function.BiFunction;
3030
import java.util.function.Predicate;
31+
import java.util.stream.Stream;
3132

3233
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
34+
import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER;
3335

3436
/**
3537
* File Cache (FC) is introduced to solve the problem that the local disk cannot hold
@@ -192,6 +194,11 @@ public void logCurrentState() {
192194
theCache.logCurrentState();
193195
}
194196

197+
// To be used only in testing framework.
198+
public void closeIndexInputReferences() {
199+
theCache.closeIndexInputReferences();
200+
}
201+
195202
/**
196203
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
197204
* @param filePath the path key for which entry is added
@@ -216,32 +223,29 @@ private void checkParentBreaker(Path filePath) {
216223
* directory within the provided file cache path.
217224
*/
218225
public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
219-
fileCacheDataPaths.stream()
220-
.filter(Files::isDirectory)
221-
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
222-
.filter(Files::isDirectory)
223-
.flatMap(dir -> {
224-
try {
225-
return Files.list(dir);
226-
} catch (IOException e) {
227-
throw new UncheckedIOException(
228-
"Unable to process file cache directory. Please clear the file cache for node startup.",
229-
e
230-
);
231-
}
232-
})
233-
.filter(Files::isRegularFile)
234-
.forEach(path -> {
235-
try {
236-
put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path)));
237-
decRef(path.toAbsolutePath());
238-
} catch (IOException e) {
239-
throw new UncheckedIOException(
240-
"Unable to retrieve cache file details. Please clear the file cache for node startup.",
241-
e
242-
);
243-
}
244-
});
226+
Stream.concat(
227+
fileCacheDataPaths.stream()
228+
.filter(Files::isDirectory)
229+
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
230+
.filter(Files::isDirectory),
231+
fileCacheDataPaths.stream()
232+
.filter(Files::isDirectory)
233+
.map(path -> path.resolve(INDICES_FOLDER_IDENTIFIER))
234+
.filter(Files::isDirectory)
235+
).flatMap(dir -> {
236+
try {
237+
return Files.list(dir);
238+
} catch (IOException e) {
239+
throw new UncheckedIOException("Unable to process file cache directory. Please clear the file cache for node startup.", e);
240+
}
241+
}).filter(Files::isRegularFile).forEach(path -> {
242+
try {
243+
put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path)));
244+
decRef(path.toAbsolutePath());
245+
} catch (IOException e) {
246+
throw new UncheckedIOException("Unable to retrieve cache file details. Please clear the file cache for node startup.", e);
247+
}
248+
});
245249
}
246250

247251
/**
@@ -308,10 +312,10 @@ public AggregateFileCacheStats fileCacheStats() {
308312
* These entries are eligible for eviction so if nothing needs to reference
309313
* them they will be deleted when the disk-based local cache fills up.
310314
*/
311-
private static class RestoredCachedIndexInput implements CachedIndexInput {
315+
public static class RestoredCachedIndexInput implements CachedIndexInput {
312316
private final long length;
313317

314-
private RestoredCachedIndexInput(long length) {
318+
public RestoredCachedIndexInput(long length) {
315319
this.length = length;
316320
}
317321

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.nio.file.Path;
2727

2828
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
29+
import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER;
2930

3031
/**
3132
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
@@ -55,29 +56,54 @@ public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
5556
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
5657
if (indexSettings.isRemoteSnapshot()) {
5758
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
58-
cleanupShardFileCache(shardPath);
59+
cleanupShardFileCache(shardPath, false, true);
5960
deleteShardFileCacheDirectory(shardPath);
61+
} else if (indexSettings.isWarmIndex()) {
62+
try {
63+
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnvironment, shardId, indexSettings.customDataPath());
64+
if (shardPath != null) {
65+
cleanupShardFileCache(shardPath, true, false);
66+
deleteShardFileCacheDirectory(shardPath);
67+
}
68+
} catch (IOException e) {
69+
logger.error("failed to delete warm index shard file cache directory", e);
70+
}
6071
}
6172
}
6273

6374
/**
6475
* Cleans up the corresponding index file path entries from FileCache
6576
*
66-
* @param shardPath the shard path
77+
* @param isWarmIndex flag indicating if this is a remote index
78+
* @param isRemoteSnapshot flag indicating if this is a remote snapshot
6779
*/
68-
private void cleanupShardFileCache(ShardPath shardPath) {
80+
private void cleanupShardFileCache(ShardPath shardPath, boolean isWarmIndex, boolean isRemoteSnapshot) {
6981
try {
7082
final FileCache fc = fileCacheProvider.get();
7183
assert fc != null;
72-
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
84+
85+
final Path localStorePath;
86+
if (isWarmIndex) {
87+
localStorePath = shardPath.getDataPath().resolve(INDICES_FOLDER_IDENTIFIER);
88+
} else if (isRemoteSnapshot) {
89+
localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
90+
} else {
91+
return;
92+
}
93+
7394
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
7495
for (Path subPath : ds) {
7596
fc.remove(subPath.toRealPath());
7697
}
7798
}
7899
} catch (IOException ioe) {
100+
String operationType = isWarmIndex ? "warm index" : "remote snapshot";
79101
logger.error(
80-
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
102+
() -> new ParameterizedMessage(
103+
"Error removing items from cache during {} shard deletion {}",
104+
operationType,
105+
shardPath.getShardId()
106+
),
81107
ioe
82108
);
83109
}
@@ -112,6 +138,15 @@ public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, Nod
112138
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
113139
}
114140
}
141+
} else if (indexSettings.isWarmIndex()) {
142+
final Path indicesPathInCache = nodeEnvironment.fileCacheNodePath().indicesPath.resolve(index.getUUID());
143+
if (Files.exists(indicesPathInCache)) {
144+
try {
145+
IOUtils.rm(indicesPathInCache);
146+
} catch (IOException e) {
147+
logger.error(() -> new ParameterizedMessage("Failed to delete indices path in cache for index {}", index), e);
148+
}
149+
}
115150
}
116151
}
117152
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuild
5555
Path key = removalNotification.getKey();
5656
if (removalReason != RemovalReason.REPLACED) {
5757
catchAsRuntimeException(value::close);
58-
catchAsRuntimeException(() -> Files.deleteIfExists(key));
58+
// On RESTARTED removal, we close the IndexInput but preserve the files on disk as this scenario only occurs during
59+
// tests
60+
if (removalReason != RemovalReason.RESTARTED) {
61+
catchAsRuntimeException(() -> Files.deleteIfExists(key));
62+
}
5963
}
6064
});
6165
}

server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
public class FileTypeUtils {
2020

2121
public static String BLOCK_FILE_IDENTIFIER = "_block_";
22+
public static String INDICES_FOLDER_IDENTIFIER = "index";
2223

2324
public static boolean isTempFile(String name) {
2425
return name.endsWith(".tmp");

0 commit comments

Comments
 (0)