Skip to content

Commit 23d1c7a

Browse files
authored
Fix deletion permits flow in RemoteFsTimestampAwareTranslog (#16282)
--------- Signed-off-by: Sachin Kale <[email protected]>
1 parent 35c366d commit 23d1c7a

File tree

5 files changed

+317
-65
lines changed

5 files changed

+317
-65
lines changed

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -215,21 +215,42 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
215215
logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
216216
if (generationsToBeDeleted.isEmpty() == false) {
217217
// Delete stale generations
218-
translogTransferManager.deleteGenerationAsync(
219-
primaryTermSupplier.getAsLong(),
220-
generationsToBeDeleted,
221-
remoteGenerationDeletionPermits::release
222-
);
218+
try {
219+
translogTransferManager.deleteGenerationAsync(
220+
primaryTermSupplier.getAsLong(),
221+
generationsToBeDeleted,
222+
remoteGenerationDeletionPermits::release
223+
);
224+
} catch (Exception e) {
225+
logger.error("Exception in delete generations flow", e);
226+
// Release permit that is meant for metadata files and return
227+
remoteGenerationDeletionPermits.release();
228+
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
229+
+ remoteGenerationDeletionPermits.availablePermits()
230+
+ " is not equal to "
231+
+ REMOTE_DELETION_PERMITS;
232+
return;
233+
}
223234
} else {
224235
remoteGenerationDeletionPermits.release();
225236
}
226237

227238
if (metadataFilesToBeDeleted.isEmpty() == false) {
228239
// Delete stale metadata files
229-
translogTransferManager.deleteMetadataFilesAsync(
230-
metadataFilesToBeDeleted,
231-
remoteGenerationDeletionPermits::release
232-
);
240+
try {
241+
translogTransferManager.deleteMetadataFilesAsync(
242+
metadataFilesToBeDeleted,
243+
remoteGenerationDeletionPermits::release
244+
);
245+
} catch (Exception e) {
246+
logger.error("Exception in delete metadata files flow", e);
247+
// Permits is already released by deleteMetadataFilesAsync
248+
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
249+
+ remoteGenerationDeletionPermits.availablePermits()
250+
+ " is not equal to "
251+
+ REMOTE_DELETION_PERMITS;
252+
return;
253+
}
233254

234255
// Update cache to keep only those metadata files that are not getting deleted
235256
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
@@ -240,7 +261,12 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
240261
remoteGenerationDeletionPermits.release();
241262
}
242263
} catch (Exception e) {
264+
logger.error("Exception in trimUnreferencedReaders", e);
243265
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
266+
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
267+
+ remoteGenerationDeletionPermits.availablePermits()
268+
+ " is not equal to "
269+
+ REMOTE_DELETION_PERMITS;
244270
}
245271
}
246272

@@ -441,7 +467,8 @@ protected static void deleteStaleRemotePrimaryTerms(
441467
}
442468
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
443469
try {
444-
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
470+
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger)
471+
.v1();
445472
} catch (IOException e) {
446473
return Long.MIN_VALUE;
447474
}
@@ -482,7 +509,8 @@ protected static Long getMinPrimaryTermInRemote(
482509
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
483510
String metadataFile,
484511
TranslogTransferManager translogTransferManager,
485-
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
512+
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
513+
Logger logger
486514
) throws IOException {
487515
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
488516
if (minMaxPrimaryTermFromFileName != null) {
@@ -504,6 +532,8 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
504532
if (primaryTerm.isPresent()) {
505533
minPrimaryTem = primaryTerm.get();
506534
}
535+
} else {
536+
logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile);
507537
}
508538
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
509539
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,14 @@ protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException
590590
generationsToDelete.add(generation);
591591
}
592592
if (generationsToDelete.isEmpty() == false) {
593-
deleteRemoteGeneration(generationsToDelete);
593+
try {
594+
deleteRemoteGeneration(generationsToDelete);
595+
} catch (Exception e) {
596+
logger.error("Exception in delete generations flow", e);
597+
// Release permit that is meant for metadata files and return
598+
remoteGenerationDeletionPermits.release();
599+
return;
600+
}
594601
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
595602
deleteStaleRemotePrimaryTerms();
596603
} else {

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -496,19 +496,24 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
496496
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
497497
*/
498498
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
499-
List<String> translogFiles = new ArrayList<>();
500-
generations.forEach(generation -> {
501-
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
502-
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
503-
String translogFileName = Translog.getFilename(generation);
504-
if (isTranslogMetadataEnabled == false) {
505-
translogFiles.addAll(List.of(ckpFileName, translogFileName));
506-
} else {
507-
translogFiles.add(translogFileName);
508-
}
509-
});
510-
// Delete the translog and checkpoint files asynchronously
511-
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
499+
try {
500+
List<String> translogFiles = new ArrayList<>();
501+
generations.forEach(generation -> {
502+
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
503+
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
504+
String translogFileName = Translog.getFilename(generation);
505+
if (isTranslogMetadataEnabled == false) {
506+
translogFiles.addAll(List.of(ckpFileName, translogFileName));
507+
} else {
508+
translogFiles.add(translogFileName);
509+
}
510+
});
511+
// Delete the translog and checkpoint files asynchronously
512+
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
513+
} catch (Exception e) {
514+
onCompletion.run();
515+
throw e;
516+
}
512517
}
513518

514519
/**
@@ -658,37 +663,32 @@ public void deleteTranslogFiles() throws IOException {
658663
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
659664
*/
660665
private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runnable onCompletion) {
661-
try {
662-
transferService.deleteBlobsAsync(
663-
ThreadPool.Names.REMOTE_PURGE,
664-
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
665-
files,
666-
new ActionListener<>() {
667-
@Override
668-
public void onResponse(Void unused) {
669-
fileTransferTracker.delete(files);
670-
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
671-
onCompletion.run();
672-
}
666+
transferService.deleteBlobsAsync(
667+
ThreadPool.Names.REMOTE_PURGE,
668+
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
669+
files,
670+
new ActionListener<>() {
671+
@Override
672+
public void onResponse(Void unused) {
673+
fileTransferTracker.delete(files);
674+
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
675+
onCompletion.run();
676+
}
673677

674-
@Override
675-
public void onFailure(Exception e) {
676-
onCompletion.run();
677-
logger.error(
678-
() -> new ParameterizedMessage(
679-
"Exception occurred while deleting translog for primaryTerm={} files={}",
680-
primaryTerm,
681-
files
682-
),
683-
e
684-
);
685-
}
678+
@Override
679+
public void onFailure(Exception e) {
680+
onCompletion.run();
681+
logger.error(
682+
() -> new ParameterizedMessage(
683+
"Exception occurred while deleting translog for primaryTerm={} files={}",
684+
primaryTerm,
685+
files
686+
),
687+
e
688+
);
686689
}
687-
);
688-
} catch (Exception e) {
689-
onCompletion.run();
690-
throw e;
691-
}
690+
}
691+
);
692692
}
693693

694694
/**

0 commit comments

Comments
 (0)