-
Notifications
You must be signed in to change notification settings - Fork 2.3k
[Remote Routing Table] Add write flow for remote routing table #13870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 42 commits
Commits
Show all changes
66 commits
Select commit
Hold shift + click to select a range
d5a17ed
Initial commit for RemoteRoutingTableService setup
himshikha 9176015
Adds unit test for remote routing setup
7d9badd
Updating remote routing table setting name
ad7ecf5
Update remote routing table repo setting name
a66aaaf
Initial commit for index routing table manifest
Bukhtawar ad480ee
Changes for IndexRoutingTableHeader
Bukhtawar 2ad70c4
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar acc172e
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar f65b102
Changes for IndexRoutingTableInputStream
Bukhtawar 441f520
Fixing IndexRoutingTableInputStream and moving checksum to end to file
ee70dca
Add read flow for IndexRoutingTable
alchemist51 7aeecc8
Moving routing table version from IndexRouting stream to manifest
0b70dea
Merge branch 'main' into remote_routing
himshikha 7b2bc79
Refactor reader and add failure test
alchemist51 88d266f
Fix GatewayMetaStatePersistedStateTests
alchemist51 4c97869
Moving codec to version 2 for compatibility with manifest parser
2107372
spotless apply and fixing gradle failures
ce04f80
Merge branch 'main' into remote_routing
himshikha 89c33bc
Remove unused methods
df64105
Handle routingTable repo check in JoinTaskExecutor
b80c89b
Merge branch 'main' into remote_routing
59a0e90
spotless fix
51333c9
Reverting accidental commit
418d9fd
Removing buffer logic
0400f40
Merge branch 'main' into remote_routing_input_stream
802e04f
Move repo check logic to RemoteStoreNodeAttr
7f33ba2
Move BufferedChecksum streams to libs/core
66d02d9
Merge branch 'remote_routing_input_stream' into async_write_pr
42efb38
Spotless fix
a204663
Merge branch 'remote_routing_input_stream' into async_write_pr
9b50f8a
Refactor RemoteIndexRoutingTable read
alchemist51 a7f7cab
Add Manifest Tests and spotless fix
alchemist51 49bf7d0
Draft changes for remote routing table write
71e151f
Fix remoteClusterServiceTests
alchemist51 a9ab9c1
Remote store objects to implement writeable interface
4167eeb
addressing pr comments
4194886
Merge branch 'main' into remote_routing
e486165
Addressing PR comments
b3c18d8
test fixes
5d65317
Merge branch 'remote_routing_input_stream' into async_write_pr
bfde8a0
Merge branch 'remote_routing' into async_write_pr
50a2a2e
Cleaning up code
8ecf865
addressing PR comments
09782bc
Merge branch 'remote_routing_input_stream' into async_write_pr
dfc9042
node join based on routing table repo
3b75317
Merge branch 'remote_routing' into async_write_pr
b367d00
Merge branch 'main' into remote_routing
7ad7d05
rebasing
0e5862d
handle case in node join
b120ce0
Merge branch 'remote_routing' into async_write_pr
bc1c1c7
Adding tests
a27b48d
Add tests
fe6568c
Merge branch 'main' into async_write_pr
089d4ee
rebasing
41a5f64
spotless fix
df1704f
Move path generation logic to path input
cfd28d9
Recovering Refactor the path type and hash algorith code
e6f41f6
moving remoteRoutingTableService init to factory
b1f73f1
spotless fix
1abe899
PR comments
5d3f5b2
Merge branch 'main' into async_write_pr
himshikha df7339c
Fixing gradle failure
abd6f90
Fixing tests
c015d56
add log
a604d89
Merge branch 'main' into async_write_pr
himshikha 13af300
fixing test failure due to merge
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
237 changes: 237 additions & 0 deletions
237
server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cluster.routing.remote; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.DiffableUtils; | ||
import org.opensearch.cluster.routing.IndexRoutingTable; | ||
import org.opensearch.cluster.routing.RoutingTable; | ||
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; | ||
import org.opensearch.common.io.stream.BytesStreamOutput; | ||
import org.opensearch.common.lifecycle.AbstractLifecycleComponent; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.io.IOUtils; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.gateway.remote.RemoteClusterStateService; | ||
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; | ||
import org.opensearch.node.Node; | ||
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.repositories.Repository; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
|
||
import java.io.IOException; | ||
import java.util.function.Supplier; | ||
|
||
import org.apache.lucene.store.IndexInput; | ||
import org.opensearch.action.LatchedActionListener; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.common.CheckedRunnable; | ||
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.stream.write.WritePriority; | ||
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; | ||
|
||
import org.opensearch.common.lucene.store.ByteArrayIndexInput; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest; | ||
import org.opensearch.index.remote.RemoteStoreEnums; | ||
import org.opensearch.index.remote.RemoteStorePathStrategy; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; | ||
|
||
/** | ||
* A Service which provides APIs to upload and download routing table from remote store. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemoteRoutingTableService extends AbstractLifecycleComponent { | ||
|
||
/** | ||
* Cluster setting to specify if routing table should be published to remote store | ||
*/ | ||
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting( | ||
"cluster.remote_store.routing_table.enabled", | ||
false, | ||
Setting.Property.NodeScope, | ||
Setting.Property.Final | ||
); | ||
|
||
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; | ||
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; | ||
public static final String DELIMITER = "__"; | ||
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; | ||
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); | ||
private final Settings settings; | ||
private final Supplier<RepositoriesService> repositoriesService; | ||
private BlobStoreRepository blobStoreRepository; | ||
|
||
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) { | ||
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; | ||
this.repositoriesService = repositoriesService; | ||
this.settings = settings; | ||
} | ||
|
||
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() { | ||
@Override | ||
public void write(IndexRoutingTable value, StreamOutput out) throws IOException { | ||
value.writeTo(out); | ||
} | ||
|
||
@Override | ||
public IndexRoutingTable read(StreamInput in, String key) throws IOException { | ||
return IndexRoutingTable.readFrom(in); | ||
} | ||
}; | ||
|
||
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) { | ||
return DiffableUtils.diff( | ||
before.getIndicesRouting(), | ||
after.getIndicesRouting(), | ||
DiffableUtils.getStringKeySerializer(), | ||
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER | ||
); | ||
} | ||
|
||
public CheckedRunnable<IOException> getIndexRoutingAsyncAction( | ||
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ClusterState clusterState, | ||
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
IndexRoutingTable indexRouting, | ||
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener, | ||
BlobPath clusterBasePath | ||
) throws IOException { | ||
|
||
BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN); | ||
BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(RemoteStorePathStrategy.BasePathInput.builder() | ||
.basePath(indexRoutingPath) | ||
.indexUUID(indexRouting.getIndex().getUUID()) | ||
.build(), | ||
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64); | ||
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); | ||
|
||
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
final String fileName = getIndexRoutingFileName(); | ||
|
||
ActionListener<Void> completionListener = ActionListener.wrap( | ||
resp -> latchedActionListener.onResponse( | ||
new ClusterMetadataManifest.UploadedIndexMetadata( | ||
indexRouting.getIndex().getName(), | ||
indexRouting.getIndex().getUUID(), | ||
path.buildAsString() + fileName, | ||
himshikha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
INDEX_ROUTING_METADATA_PREFIX | ||
) | ||
), | ||
ex -> latchedActionListener.onFailure(new RemoteClusterStateService.RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)) | ||
); | ||
|
||
return () -> uploadIndex(indexRouting, fileName , blobContainer, completionListener); | ||
} | ||
|
||
|
||
public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) { | ||
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting() | ||
.stream() | ||
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); | ||
|
||
indicesRoutingToUpload.forEach( | ||
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting) | ||
); | ||
indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove); | ||
|
||
return new ArrayList<>(allUploadedIndicesRouting.values()); | ||
} | ||
|
||
private void uploadIndex(IndexRoutingTable indexRouting, String fileName, BlobContainer blobContainer, ActionListener<Void> completionListener) throws IOException { | ||
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting); | ||
BytesReference bytesInput = null; | ||
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { | ||
indexRoutingInput.writeTo(streamOutput); | ||
bytesInput = streamOutput.bytes(); | ||
} catch (IOException e) { | ||
throw new IOException("Failed to serialize IndexRoutingTable. ", e); | ||
} | ||
|
||
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { | ||
try { | ||
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true); | ||
completionListener.onResponse(null); | ||
} catch (IOException e) { | ||
throw new IOException("Failed to write IndexRoutingTable to remote store. ", e); | ||
} | ||
return; | ||
} | ||
|
||
try ( | ||
IndexInput input = new ByteArrayIndexInput("indexrouting",BytesReference.toBytes(bytesInput))) { | ||
try ( | ||
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( | ||
fileName, | ||
fileName, | ||
input.length(), | ||
true, | ||
WritePriority.URGENT, | ||
(size, position) -> new OffsetRangeIndexInputStream(input, size, position), | ||
null, | ||
false | ||
) | ||
) { | ||
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener); | ||
} catch (IOException e) { | ||
throw new IOException("Failed to write IndexRoutingTable to remote store. ", e); | ||
} | ||
} catch (IOException e) { | ||
throw new IOException("Failed to create transfer object for IndexRoutingTable for remote store upload. ", e); | ||
} | ||
|
||
} | ||
|
||
private String getIndexRoutingFileName() { | ||
return String.join( | ||
DELIMITER, | ||
INDEX_ROUTING_FILE_PREFIX, | ||
RemoteStoreUtils.invertLong(System.currentTimeMillis()) | ||
); | ||
|
||
} | ||
|
||
@Override | ||
protected void doClose() throws IOException { | ||
if (blobStoreRepository != null) { | ||
IOUtils.close(blobStoreRepository); | ||
} | ||
} | ||
|
||
@Override | ||
protected void doStart() { | ||
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; | ||
final String remoteStoreRepo = settings.get( | ||
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY | ||
); | ||
assert remoteStoreRepo != null : "Remote routing table repository is not configured"; | ||
final Repository repository = repositoriesService.get().repository(remoteStoreRepo); | ||
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; | ||
blobStoreRepository = (BlobStoreRepository) repository; | ||
} | ||
|
||
@Override | ||
protected void doStop() {} | ||
|
||
} |
10 changes: 10 additions & 0 deletions
10
server/src/main/java/org/opensearch/cluster/routing/remote/package-info.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** Package containing class to perform operations on remote routing table */ | ||
package org.opensearch.cluster.routing.remote; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.