Skip to content

Fix Shallow copy snapshot failures on closed index #16868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -1078,4 +1081,67 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms"));

assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Path shallowSnapshotRepoPath = randomRepoPath();
Settings.Builder settings = Settings.builder()
.put("location", shallowSnapshotRepoPath)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE);
createRepository(shallowSnapshotRepoName, "fs", settings);

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}
flushAndRefresh(INDEX_NAME);

logger.info("Verify shallow snapshot created before close");
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state());
assertTrue(snapshotInfo1.successfulShards() > 0);
assertEquals(0, snapshotInfo1.failedShards());

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}

// close index
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(1000);
logger.info("Verify shallow snapshot created after close");
final String snapshot2 = "snapshot2";

SnapshotInfo snapshotInfo2 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state());
assertTrue(snapshotInfo2.successfulShards() > 0);
assertEquals(0, snapshotInfo2.failedShards());
}
}
22 changes: 22 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,28 @@
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
* reads the last metadata file from remote store and fetches files present in commit and their sizes.
* @return Tuple(Tuple(primaryTerm, commitGeneration), indexFilesToFileLengthMap)
* @throws IOException
*/
public Tuple<Tuple<Long, Long>, Map<String, Long>> acquireLastRemoteUploadedIndexCommit() throws IOException {
if (!indexSettings.isAssignedOnRemoteNode()) {
throw new IllegalStateException("Index is not assigned on Remote Node");

Check warning on line 1634 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1634

Added line #L1634 was not covered by tests
}
RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile();
if (lastUploadedMetadata == null) {
throw new IllegalStateException("No metadata file found in remote store");

Check warning on line 1638 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1638

Added line #L1638 was not covered by tests
}
final Map<String, Long> indexFilesToFileLengthMap = lastUploadedMetadata.getMetadata()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength()));
long primaryTerm = lastUploadedMetadata.getPrimaryTerm();
long commitGeneration = lastUploadedMetadata.getGeneration();
return new Tuple<>(new Tuple<>(primaryTerm, commitGeneration), indexFilesToFileLengthMap);
}

/**
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,45 @@
throw new UnsupportedOperationException();
}

/**
* Adds a reference of remote store data for a index commit point.
* <p>
* The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method.
* Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#acquireLastRemoteUploadedIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param primaryTerm current Primary Term
* @param commitGeneration current commit generation
* @param startTime start time of the snapshot commit, this will be used as the start time for snapshot.
* @param indexFilesToFileLengthMap map of index files to file length
* @param listener listener invoked on completion
*/
default void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
@Nullable IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
@Nullable Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
throw new UnsupportedOperationException();

Check warning on line 455 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L455

Added line #L455 was not covered by tests
}

/**
* Restores snapshot of the shard.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3753,27 +3753,42 @@
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}
if (snapshotIndexCommit == null && indexFilesToFileLengthMap == null) {
listener.onFailure(new RepositoryException(metadata.name(), "both snapshot index commit and index files map cannot be null"));
return;

Check warning on line 3767 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3766-L3767

Added lines #L3766 - L3767 were not covered by tests
}

final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
List<String> fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
List<String> fileNames;

if (snapshotIndexCommit != null) {
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
}
} else {
fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();

Check warning on line 3789 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3788-L3789

Added lines #L3788 - L3789 were not covered by tests
}

int indexTotalNumberOfFiles = fileNames.size();

snapshotStatus.moveToStarted(
Expand All @@ -3784,7 +3799,7 @@
indexTotalFileSize
);

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration);

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
Expand All @@ -3795,7 +3810,7 @@
snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
primaryTerm,
snapshotIndexCommit.getGeneration(),
commitGeneration,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
indexTotalNumberOfFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -74,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -371,7 +372,9 @@
ActionListener<String> listener
) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShardOrNull(shardId.id());
final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
Expand All @@ -398,36 +401,53 @@
if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
long startTime = threadPool.relativeTimeInMillis();
long primaryTerm = indexShard.getOperationPrimaryTerm();
// we flush first to make sure we get the latest writes snapshotted
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
long commitGeneration = 0L;
Map<String, Long> indexFilesToFileLengthMap = null;
IndexCommit snapshotIndexCommit = null;

try {
if (closedIndex) {
final Tuple<Tuple<Long, Long>, Map<String, Long>> tuple = indexShard.acquireLastRemoteUploadedIndexCommit();
primaryTerm = tuple.v1().v1();
commitGeneration = tuple.v1().v2();
indexFilesToFileLengthMap = tuple.v2();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (NoSuchFileException e) {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (IOException e) {

Check warning on line 420 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L420

Added line #L420 was not covered by tests
if (closedIndex) {
logger.warn("Exception while reading latest metadata file from remote store");
throw e;

Check warning on line 423 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L422-L423

Added lines #L422 - L423 were not covered by tests
} else {
wrappedSnapshot.close();
logger.warn(

Check warning on line 426 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L425-L426

Added lines #L425 - L426 were not covered by tests
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration

Check warning on line 429 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L428-L429

Added lines #L428 - L429 were not covered by tests
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);

Check warning on line 435 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L431-L435

Added lines #L431 - L435 were not covered by tests
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
getShardStateId(indexShard, snapshotIndexCommit),
null,
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
ActionListener.runBefore(listener, wrappedSnapshot::close)
indexFilesToFileLengthMap,
closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close)
);
} catch (IndexShardSnapshotFailedException e) {
logger.error(
Expand Down
Loading
Loading