Skip to content

Commit 802e8ab

Browse files
Optimize remote store operations during snapshot Deletion (#12319) (#12677)
(cherry picked from commit b265215) Signed-off-by: Harish Bhakuni <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 965d85a commit 802e8ab

File tree

6 files changed

+366
-79
lines changed

6 files changed

+366
-79
lines changed

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -718,10 +718,49 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
718718
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
719719
}
720720

721+
// Visible for testing
722+
Set<String> getMetadataFilesToFilterActiveSegments(
723+
final int lastNMetadataFilesToKeep,
724+
final List<String> sortedMetadataFiles,
725+
final Set<String> lockedMetadataFiles
726+
) {
727+
// the idea here is for each deletable md file, we can consider the segments present in non-deletable md file
728+
// before this and non-deletable md file after this to compute the active segment files.
729+
// For ex:
730+
// lastNMetadataFilesToKeep = 3
731+
// sortedMetadataFiles = [m1, m2, m3, m4, m5, m6(locked), m7(locked), m8(locked), m9(locked), m10]
732+
// lockedMetadataFiles = m6, m7, m8, m9
733+
// then the returned set will be (m3, m6, m9)
734+
final Set<String> metadataFilesToFilterActiveSegments = new HashSet<>();
735+
for (int idx = lastNMetadataFilesToKeep; idx < sortedMetadataFiles.size(); idx++) {
736+
if (lockedMetadataFiles.contains(sortedMetadataFiles.get(idx)) == false) {
737+
String prevMetadata = (idx - 1) >= 0 ? sortedMetadataFiles.get(idx - 1) : null;
738+
String nextMetadata = (idx + 1) < sortedMetadataFiles.size() ? sortedMetadataFiles.get(idx + 1) : null;
739+
740+
if (prevMetadata != null && (lockedMetadataFiles.contains(prevMetadata) || idx == lastNMetadataFilesToKeep)) {
741+
// if previous metadata of deletable md is locked, add it to md files for active segments.
742+
metadataFilesToFilterActiveSegments.add(prevMetadata);
743+
}
744+
if (nextMetadata != null && lockedMetadataFiles.contains(nextMetadata)) {
745+
// if next metadata of deletable md is locked, add it to md files for active segments.
746+
metadataFilesToFilterActiveSegments.add(nextMetadata);
747+
}
748+
}
749+
}
750+
return metadataFilesToFilterActiveSegments;
751+
}
752+
721753
/**
722754
* Delete stale segment and metadata files
723755
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
724-
* we just need to read the latest metadata file. All the stale metadata files can be safely deleted.
756+
* we just need to read the latest metadata file.
757+
* Assumptions:
758+
* (1) if a segment file is not present in a md file, it will never be present in any md file created after that, and
759+
* (2) if (md1, md2, md3) are in sorted order, it is not possible that a segment file will be in md1 and md3 but not in md2.
760+
* <p>
761+
* for each deletable md file, segments present in non-deletable md file before this and non-deletable md file
762+
* after this are sufficient to compute the list of active or non-deletable segment files referenced by a deletable
763+
* md file
725764
*
726765
* @param lastNMetadataFilesToKeep number of metadata files to keep
727766
* @throws IOException in case of I/O error while reading from / writing to remote segment store
@@ -760,7 +799,6 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
760799
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
761800
.collect(Collectors.toList());
762801

763-
sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
764802
logger.debug(
765803
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
766804
metadataFilesEligibleToDelete,
@@ -769,7 +807,14 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
769807

770808
Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
771809
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
772-
for (String metadataFile : sortedMetadataFileList) {
810+
811+
final Set<String> metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments(
812+
lastNMetadataFilesToKeep,
813+
sortedMetadataFileList,
814+
allLockFiles
815+
);
816+
817+
for (String metadataFile : metadataFilesToFilterActiveSegments) {
773818
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
774819
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
775820
activeSegmentRemoteFilenames.addAll(
@@ -848,6 +893,25 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
848893
}
849894
}
850895

896+
public static void remoteDirectoryCleanup(
897+
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
898+
String remoteStoreRepoForIndex,
899+
String indexUUID,
900+
ShardId shardId
901+
) {
902+
try {
903+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
904+
remoteStoreRepoForIndex,
905+
indexUUID,
906+
shardId
907+
);
908+
remoteSegmentStoreDirectory.deleteStaleSegments(0);
909+
remoteSegmentStoreDirectory.deleteIfEmpty();
910+
} catch (Exception e) {
911+
staticLogger.error("Exception occurred while deleting directory", e);
912+
}
913+
}
914+
851915
/*
852916
Tries to delete shard level directory if it is empty
853917
Return true if it deleted it successfully
@@ -870,7 +934,6 @@ private boolean deleteIfEmpty() throws IOException {
870934
logger.error("Exception occurred while deleting directory", e);
871935
return false;
872936
}
873-
874937
return true;
875938
}
876939

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
7676
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
7777
}
7878
}
79+
7980
}

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 103 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
118118
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
119119
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
120+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
120121
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
121122
import org.opensearch.index.store.Store;
122123
import org.opensearch.index.store.StoreFileMetadata;
@@ -237,6 +238,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
237238
Setting.Property.Deprecated
238239
);
239240

241+
private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class);
242+
240243
/**
241244
* Setting to disable caching of the latest repository data.
242245
*/
@@ -1161,6 +1164,78 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
11611164
}
11621165
}
11631166

1167+
public static void remoteDirectoryCleanupAsync(
1168+
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
1169+
ThreadPool threadpool,
1170+
String remoteStoreRepoForIndex,
1171+
String indexUUID,
1172+
ShardId shardId,
1173+
String threadPoolName
1174+
) {
1175+
threadpool.executor(threadPoolName)
1176+
.execute(
1177+
new RemoteStoreShardCleanupTask(
1178+
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(
1179+
remoteDirectoryFactory,
1180+
remoteStoreRepoForIndex,
1181+
indexUUID,
1182+
shardId
1183+
),
1184+
indexUUID,
1185+
shardId
1186+
)
1187+
);
1188+
}
1189+
1190+
protected void releaseRemoteStoreLockAndCleanup(
1191+
String shardId,
1192+
String shallowSnapshotUUID,
1193+
BlobContainer shardContainer,
1194+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
1195+
) throws IOException {
1196+
if (remoteStoreLockManagerFactory == null) {
1197+
return;
1198+
}
1199+
1200+
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
1201+
shardContainer,
1202+
shallowSnapshotUUID,
1203+
namedXContentRegistry
1204+
);
1205+
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
1206+
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
1207+
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
1208+
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
1209+
// next delete operation for releasing this lock file
1210+
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
1211+
remoteStoreRepoForIndex,
1212+
indexUUID,
1213+
shardId
1214+
);
1215+
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
1216+
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
1217+
if (!isIndexPresent(clusterService, indexUUID)) {
1218+
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
1219+
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
1220+
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
1221+
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
1222+
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
1223+
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
1224+
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
1225+
remoteStoreLockManagerFactory.getRepositoriesService(),
1226+
threadPool
1227+
);
1228+
remoteDirectoryCleanupAsync(
1229+
remoteDirectoryFactory,
1230+
threadPool,
1231+
remoteStoreRepoForIndex,
1232+
indexUUID,
1233+
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
1234+
ThreadPool.Names.REMOTE_PURGE
1235+
);
1236+
}
1237+
}
1238+
11641239
// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
11651240
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
11661241
private void executeStaleShardDelete(
@@ -1172,53 +1247,34 @@ private void executeStaleShardDelete(
11721247
if (filesToDelete != null) {
11731248
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
11741249
try {
1175-
if (remoteStoreLockManagerFactory != null) {
1176-
for (String fileToDelete : filesToDelete) {
1177-
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
1178-
String[] fileToDeletePath = fileToDelete.split("/");
1179-
String indexId = fileToDeletePath[1];
1180-
String shardId = fileToDeletePath[2];
1181-
String shallowSnapBlob = fileToDeletePath[3];
1182-
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
1183-
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
1184-
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
1185-
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
1186-
shardContainer,
1187-
snapshotUUID,
1188-
namedXContentRegistry
1189-
);
1190-
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
1191-
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
1192-
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
1193-
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
1194-
// next delete operation for releasing this lock file
1195-
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
1196-
remoteStoreRepoForIndex,
1197-
indexUUID,
1198-
shardId
1199-
);
1200-
remoteStoreMetadataLockManager.release(
1201-
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
1250+
// filtering files for which remote store lock release and cleanup succeeded,
1251+
// remaining files for which it failed will be retried in next snapshot delete run.
1252+
List<String> eligibleFilesToDelete = new ArrayList<>();
1253+
for (String fileToDelete : filesToDelete) {
1254+
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
1255+
String[] fileToDeletePath = fileToDelete.split("/");
1256+
String indexId = fileToDeletePath[1];
1257+
String shardId = fileToDeletePath[2];
1258+
String shallowSnapBlob = fileToDeletePath[3];
1259+
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
1260+
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
1261+
try {
1262+
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
1263+
eligibleFilesToDelete.add(fileToDelete);
1264+
} catch (Exception e) {
1265+
logger.error(
1266+
"Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}",
1267+
indexId,
1268+
shardId,
1269+
snapshotUUID
12021270
);
1203-
if (!isIndexPresent(clusterService, indexUUID)) {
1204-
// this is a temporary solution where snapshot deletion triggers remote store side
1205-
// cleanup if index is already deleted. We will add a poller in future to take
1206-
// care of remote store side cleanup.
1207-
// see https://github.com/opensearch-project/OpenSearch/issues/8469
1208-
new RemoteSegmentStoreDirectoryFactory(
1209-
remoteStoreLockManagerFactory.getRepositoriesService(),
1210-
threadPool
1211-
).newDirectory(
1212-
remoteStoreRepoForIndex,
1213-
indexUUID,
1214-
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
1215-
).close();
1216-
}
12171271
}
1272+
} else {
1273+
eligibleFilesToDelete.add(fileToDelete);
12181274
}
12191275
}
12201276
// Deleting the shard blobs
1221-
deleteFromContainer(blobContainer(), filesToDelete);
1277+
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
12221278
l.onResponse(null);
12231279
} catch (Exception e) {
12241280
logger.warn(
@@ -1651,39 +1707,12 @@ private void executeOneStaleIndexDelete(
16511707
for (String blob : shardBlob.getValue().listBlobs().keySet()) {
16521708
final Optional<String> snapshotUUID = extractShallowSnapshotUUID(blob);
16531709
if (snapshotUUID.isPresent()) {
1654-
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
1655-
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
1656-
shardBlob.getValue(),
1657-
snapshotUUID.get(),
1658-
namedXContentRegistry
1659-
);
1660-
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
1661-
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
1662-
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
1663-
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
1664-
// and that would be used during next delete operation for releasing this stale lock file
1665-
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
1666-
remoteStoreRepoForIndex,
1667-
indexUUID,
1668-
shardBlob.getKey()
1669-
);
1670-
remoteStoreMetadataLockManager.release(
1671-
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build()
1710+
releaseRemoteStoreLockAndCleanup(
1711+
shardBlob.getKey(),
1712+
snapshotUUID.get(),
1713+
shardBlob.getValue(),
1714+
remoteStoreLockManagerFactory
16721715
);
1673-
if (!isIndexPresent(clusterService, indexUUID)) {
1674-
// this is a temporary solution where snapshot deletion triggers remote store side
1675-
// cleanup if index is already deleted. We will add a poller in future to take
1676-
// care of remote store side cleanup.
1677-
// see https://github.com/opensearch-project/OpenSearch/issues/8469
1678-
new RemoteSegmentStoreDirectoryFactory(
1679-
remoteStoreLockManagerFactory.getRepositoriesService(),
1680-
threadPool
1681-
).newDirectory(
1682-
remoteStoreRepoForIndex,
1683-
indexUUID,
1684-
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey()))
1685-
).close();
1686-
}
16871716
}
16881717
}
16891718
}

0 commit comments

Comments
 (0)