Skip to content

Commit 16063da

Browse files
committed
More changes
Signed-off-by: Gaurav Bafna <[email protected]>
1 parent cc61ad4 commit 16063da

File tree

2 files changed

+104
-55
lines changed

2 files changed

+104
-55
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,28 +1030,39 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Interr
10301030
indexDocuments(client, indexName2, numDocsInIndex2);
10311031
ensureGreen(indexName1, indexName2);
10321032

1033-
int concurrentSnapshots = 5;
1033+
Thread thread = new Thread(() -> {
1034+
try {
1035+
String snapshotName = "snapshot-earlier-master";
1036+
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
1037+
.cluster()
1038+
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
1039+
.setWaitForCompletion(true)
1040+
.get();
1041+
} catch (Exception e) {}
1042+
});
1043+
1044+
thread.start();
10341045

1035-
String snapshotName = "snapshot-concurrent-";
1046+
// stop existing master
1047+
final String clusterManagerNode = internalCluster().getClusterManagerName();
1048+
stopNode(clusterManagerNode);
1049+
1050+
// Validate that we have greater one snapshot has been created
1051+
String snapshotName = "new-snapshot";
10361052
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
10371053
.cluster()
10381054
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
10391055
.setWaitForCompletion(false)
10401056
.get();
1041-
1042-
//restart existing master
1043-
final String clusterManagerNode = internalCluster().getClusterManagerName();
1044-
stopNode(clusterManagerNode);
1045-
1046-
// Validate that only one snapshot has been created
10471057
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
10481058
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
10491059
repository.getRepositoryData(repositoryDataPlainActionFuture);
10501060

10511061
RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
1052-
assertEquals(repositoryData.getSnapshotIds().size() , 0);
1062+
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
1063+
thread.join();
10531064
}
1054-
1065+
10551066
public void testCreateSnapshotV2WithRedIndex() throws Exception {
10561067
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
10571068
internalCluster().startDataOnlyNode(pinnedTimestampSettings());

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

Lines changed: 83 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,12 @@ public void onResponse(RepositoryData repositoryData) {
554554

555555
@Override
556556
public void onFailure(Exception e) {
557-
logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ", repositoryName, snapshotName, e);
557+
logger.error(
558+
"Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ",
559+
repositoryName,
560+
snapshotName,
561+
e
562+
);
558563
listener.onFailure(e);
559564
}
560565
}
@@ -594,6 +599,8 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi
594599

595600
private Snapshot snapshot;
596601

602+
boolean enteredLoop;
603+
597604
@Override
598605
public ClusterState execute(ClusterState currentState) {
599606
// move to in progress
@@ -625,13 +632,7 @@ public ClusterState execute(ClusterState currentState) {
625632

626633
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
627634
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
628-
if (tryEnterRepoLoop(repositoryName) == false) {
629-
throw new ConcurrentSnapshotExecutionException(
630-
repositoryName,
631-
snapshotName,
632-
"cannot start snapshot-v2 while a repository is in finalization state"
633-
);
634-
}
635+
635636
final List<IndexId> indexIds = repositoryData.resolveNewIndices(
636637
indices,
637638
getInFlightIndexIds(runningSnapshots, repositoryName),
@@ -662,6 +663,15 @@ public ClusterState execute(ClusterState currentState) {
662663
);
663664
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(runningSnapshots);
664665
newEntries.add(newEntry);
666+
667+
enteredLoop = tryEnterRepoLoop(repositoryName);
668+
if (enteredLoop == false) {
669+
throw new ConcurrentSnapshotExecutionException(
670+
repositoryName,
671+
snapshotName,
672+
"cannot start snapshot-v2 while a repository is in finalization state"
673+
);
674+
}
665675
return ClusterState.builder(currentState)
666676
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(new ArrayList<>(newEntries)))
667677
.build();
@@ -671,7 +681,7 @@ public ClusterState execute(ClusterState currentState) {
671681
public void onFailure(String source, Exception e) {
672682
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot-v2", repositoryName, snapshotName), e);
673683
listener.onFailure(e);
674-
if ((e instanceof ConcurrentSnapshotExecutionException) == false) {
684+
if (enteredLoop) {
675685
leaveRepoLoop(repositoryName);
676686
}
677687

@@ -685,7 +695,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
685695
newEntry.indices(),
686696
repositoryData
687697
);
688-
689698
final List<String> dataStreams = indexNameExpressionResolver.dataStreamNames(
690699
newState,
691700
request.indicesOptions(),
@@ -705,11 +714,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
705714
true,
706715
pinnedTimestamp
707716
);
717+
// if (snapshotName.contains("snapshot-concurrent-")) {
718+
// try {
719+
// listener.onResponse(snapshotInfo);
720+
// leaveRepoLoop(repositoryName);
721+
// return;
722+
// } catch (Exception e) {
723+
// }
724+
// }
725+
708726
final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
709727
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
710-
pinnedTimestampListener.whenComplete(repoData -> {
711-
listener.onResponse(snapshotInfo);
712-
}, listener::onFailure);
728+
pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure);
713729
repository.finalizeSnapshot(
714730
shardGenerations,
715731
repositoryData.getGenId(),
@@ -721,7 +737,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
721737
new ActionListener<RepositoryData>() {
722738
@Override
723739
public void onResponse(RepositoryData repositoryData) {
724-
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
740+
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
725741
failSnapshotCompletionListeners(
726742
snapshot,
727743
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
@@ -732,7 +748,7 @@ public void onResponse(RepositoryData repositoryData) {
732748

733749
return;
734750
}
735-
logger.info("Process it now");
751+
endingSnapshots.remove(snapshot);
736752
leaveRepoLoop(repositoryName);
737753
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);
738754
}
@@ -1667,9 +1683,10 @@ public void applyClusterState(ClusterChangedEvent event) {
16671683
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
16681684
final boolean newClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager() == false;
16691685
if (newClusterManager && snapshotsInProgress.entries().isEmpty() == false) {
1670-
logger.info("Cleaning it now");
1671-
// clean up snapshot v2 in progress or clone v2 present
1672-
stateWithoutSnapshotv2(event.state());
1686+
// clean up snapshot v2 in progress or clone v2 present.
1687+
// Snapshot v2 create and clone are sync operation . In case of cluster manager failures in midst , we won't
1688+
// send ack to caller and won't continue on new cluster manager . Caller will need to retry it.
1689+
stateWithoutSnapshotV2(event.state());
16731690
}
16741691
processExternalChanges(
16751692
newClusterManager || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()),
@@ -1782,7 +1799,14 @@ private void processExternalChanges(boolean changedNodes, boolean startShards) {
17821799
@Override
17831800
public ClusterState execute(ClusterState currentState) {
17841801
RoutingTable routingTable = currentState.routingTable();
1785-
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
1802+
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
1803+
// Removing shallow snapshots v2 as we we take care of these in stateWithoutSnapshotV2()
1804+
snapshots = SnapshotsInProgress.of(
1805+
snapshots.entries()
1806+
.stream()
1807+
.filter(snapshot -> snapshot.remoteStoreIndexShallowCopyV2() == false)
1808+
.collect(Collectors.toList())
1809+
);
17861810
DiscoveryNodes nodes = currentState.nodes();
17871811
boolean changed = false;
17881812
final EnumSet<State> statesToUpdate;
@@ -1839,7 +1863,7 @@ public ClusterState execute(ClusterState currentState) {
18391863
changed = true;
18401864
logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot);
18411865
} else {
1842-
if (snapshot.state().completed() || completed(snapshot.shards().values())) {
1866+
if ((snapshot.state().completed() || completed(snapshot.shards().values()))) {
18431867
finishedSnapshots.add(snapshot);
18441868
}
18451869
updatedSnapshotEntries.add(snapshot);
@@ -2365,9 +2389,8 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn
23652389
return readyDeletions(result).v1();
23662390
}
23672391

2368-
private ClusterState stateWithoutSnapshotv2(ClusterState state) {
2392+
private void stateWithoutSnapshotV2(ClusterState state) {
23692393
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
2370-
ClusterState result = state;
23712394
boolean changed = false;
23722395
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
23732396
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
@@ -2378,32 +2401,44 @@ private ClusterState stateWithoutSnapshotv2(ClusterState state) {
23782401
}
23792402
}
23802403
if (changed) {
2381-
result = ClusterState.builder(state)
2382-
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries)))
2383-
.build();
2384-
2385-
ClusterState finalResult = result;
2386-
clusterService.submitStateUpdateTask("update snapshot v2 after cluster manager switch", new ClusterStateUpdateTask() {
2387-
2388-
@Override
2389-
public ClusterState execute(ClusterState currentState) throws Exception {
2390-
return finalResult;
2391-
}
2404+
clusterService.submitStateUpdateTask(
2405+
"remove in progress snapshot v2 after cluster manager switch",
2406+
new ClusterStateUpdateTask() {
2407+
@Override
2408+
public ClusterState execute(ClusterState currentState) {
2409+
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
2410+
boolean changed = false;
2411+
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
2412+
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
2413+
if (entry.remoteStoreIndexShallowCopyV2()) {
2414+
changed = true;
2415+
} else {
2416+
entries.add(entry);
2417+
}
2418+
}
2419+
if (changed) {
2420+
return ClusterState.builder(currentState)
2421+
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries)))
2422+
.build();
2423+
} else {
2424+
return currentState;
2425+
}
2426+
}
23922427

2393-
@Override
2394-
public void onFailure(String source, Exception e) {
2395-
// execute never fails today, so we should never hit this.
2396-
logger.warn(
2397-
() -> new ParameterizedMessage(
2398-
"failed to remove in progress snapshot v2 state after cluster manager switch",
2399-
source
2400-
),
2401-
e
2402-
);
2428+
@Override
2429+
public void onFailure(String source, Exception e) {
2430+
// execute never fails , so we should never hit this.
2431+
logger.warn(
2432+
() -> new ParameterizedMessage(
2433+
"failed to remove in progress snapshot v2 state after cluster manager switch",
2434+
source
2435+
),
2436+
e
2437+
);
2438+
}
24032439
}
2404-
});
2440+
);
24052441
}
2406-
return result;
24072442
}
24082443

24092444
/**
@@ -3556,6 +3591,9 @@ public boolean assertAllListenersResolved() {
35563591
+ " on ["
35573592
+ localNode
35583593
+ "]";
3594+
if (repositoryOperations.isEmpty() == false) {
3595+
logger.info("Not empty");
3596+
}
35593597
assert repositoryOperations.isEmpty() : "Found leaked snapshots to finalize " + repositoryOperations + " on [" + localNode + "]";
35603598
return true;
35613599
}

0 commit comments

Comments
 (0)