Skip to content

Remove clone holder and add Cleaner logic to clean up clones in FullFileCachedIndexInput #18662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,32 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;

import java.io.IOException;
import java.lang.ref.Cleaner;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;

/**
* Extension of {@link FileCachedIndexInput} for full files for handling clones and slices
* We maintain a clone map so that we can close them when the parent IndexInput is closed so that ref count is properly maintained in file cache
* Closing of clones explicitly is needed as Lucene does not guarantee that it will close the clones
* Since Lucene does not guarantee that it will close the clones/slices, we have created a Cleaner which handles closing of the clones/slices when they become phantom reachable
* https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33
* @opensearch.experimental
*/
@ExperimentalApi
public class FullFileCachedIndexInput extends FileCachedIndexInput {
private static final Logger logger = LogManager.getLogger(FullFileCachedIndexInput.class);
private final Set<FullFileCachedIndexInput> clones;
private final IndexInputHolder indexInputHolder;
private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory("index-input-cleaner"));

public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) {
this(cache, filePath, underlyingIndexInput, false);
}

public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) {
super(cache, filePath, underlyingIndexInput, isClone);
clones = new HashSet<>();
indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath);
CLEANER.register(this, indexInputHolder);
}

/**
Expand All @@ -48,7 +49,6 @@
public FullFileCachedIndexInput clone() {
FullFileCachedIndexInput clonedIndexInput = new FullFileCachedIndexInput(cache, filePath, luceneIndexInput.clone(), true);
cache.incRef(filePath);
clones.add(clonedIndexInput);
return clonedIndexInput;
}

Expand All @@ -74,7 +74,6 @@
}
IndexInput slicedLuceneIndexInput = luceneIndexInput.slice(sliceDescription, offset, length);
FullFileCachedIndexInput slicedIndexInput = new FullFileCachedIndexInput(cache, filePath, slicedLuceneIndexInput, true);
clones.add(slicedIndexInput);
cache.incRef(filePath);
return slicedIndexInput;
}
Expand All @@ -88,21 +87,37 @@
if (isClone) {
cache.decRef(filePath);
}
clones.forEach(indexInput -> {
try {
indexInput.close();
} catch (Exception e) {
logger.trace("Exception while closing clone - {}", e.getMessage());
}
});
try {
luceneIndexInput.close();
} catch (AlreadyClosedException e) {
logger.trace("FullFileCachedIndexInput already closed");
}
luceneIndexInput = null;
clones.clear();
closed = true;
}
}

private static class IndexInputHolder implements Runnable {
private final IndexInput indexInput;
private final FileCache cache;
private final boolean isClone;
private final Path path;

IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
this.indexInput = indexInput;
this.isClone = isClone;
this.cache = cache;
this.path = path;
}

@Override
public void run() {
try {
indexInput.close();
if (isClone) cache.decRef(path);
} catch (IOException e) {
logger.error("Failed to close IndexInput while clearing phantom reachable object");

Check warning on line 119 in server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java#L118-L119

Added lines #L118 - L119 were not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.store.remote.filecache;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand All @@ -18,6 +20,7 @@
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
Expand All @@ -28,6 +31,7 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class FileCacheTests extends OpenSearchTestCase {
// need concurrency level to be static to make these tests more deterministic because capacity per segment is dependent on
// (total capacity) / (concurrency level) so having high concurrency level might trigger early evictions which is tolerable in real life
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@

package org.opensearch.index.store.remote.filecache;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Path;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class FileCachedIndexInputTests extends OpenSearchTestCase {

protected FileCache fileCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@

package org.opensearch.index.store.remote.filecache;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests {
private FullFileCachedIndexInput fullFileCachedIndexInput;

@Override
protected void setupIndexInputAndAddToFileCache() {
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
// Putting in the file cache would increase refCount to 1
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput));
}

Expand All @@ -37,15 +42,11 @@ public void testClone() throws IOException {
fileCache.decRef(filePath);
assertFalse(isActiveAndTotalUsageSame());

// After cloning the refCount will increase again and activeUsage and totalUsage will be same again
FileCachedIndexInput clonedFileCachedIndexInput1 = fullFileCachedIndexInput.clone();
FileCachedIndexInput clonedFileCachedIndexInput2 = clonedFileCachedIndexInput1.clone();
FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone();
assertTrue(isActiveAndTotalUsageSame());
// Since no clones have been done, refCount should be zero
assertEquals((int) fileCache.getRef(filePath), 0);

// closing the first level clone will close all subsequent level clones and reduce ref count to 0
clonedFileCachedIndexInput1.close();
assertFalse(isActiveAndTotalUsageSame());
createUnclosedClonesSlices(false);
triggerGarbageCollectionAndAssertClonesClosed();

fileCache.prune();

Expand All @@ -68,12 +69,38 @@ public void testSlice() throws IOException {
fileCache.decRef(filePath);
assertFalse(isActiveAndTotalUsageSame());

// Creating a slice will increase the refCount
IndexInput slicedFileCachedIndexInput = fullFileCachedIndexInput.slice(SLICE_DESC, 1, 2);
assertTrue(isActiveAndTotalUsageSame());
// Since no clones have been done, refCount should be zero
assertEquals((int) fileCache.getRef(filePath), 0);

createUnclosedClonesSlices(true);
triggerGarbageCollectionAndAssertClonesClosed();

// Closing the parent will close all the slices as well decreasing the refCount to 0
fullFileCachedIndexInput.close();
assertFalse(isActiveAndTotalUsageSame());
}

private void triggerGarbageCollectionAndAssertClonesClosed() {
try {
// Clones/Slices will be phantom reachable now, triggering gc should call close on them
assertBusy(() -> {
System.gc(); // Do not rely on GC to be deterministic, hence the polling
assertEquals(
"Expected refCount to drop to zero as all clones/slices should have closed",
(int) fileCache.getRef(filePath),
0
);
}, 5, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Exception thrown while triggering gc", e);
fail();
}
}

private void createUnclosedClonesSlices(boolean createSlice) throws IOException {
int NUM_OF_CLONES = 3;
for (int i = 0; i < NUM_OF_CLONES; i++) {
if (createSlice) fullFileCachedIndexInput.slice("slice", 1, 2);
else fullFileCachedIndexInput.clone();
}
assertEquals((int) fileCache.getRef(filePath), NUM_OF_CLONES);
}
}
Loading