Skip to content

Commit 214486f

Browse files
committed
Making snapshot store/restore rate dynamic
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]>
1 parent 9a3fc30 commit 214486f

File tree

5 files changed

+141
-6
lines changed

5 files changed

+141
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
### Changed
1515
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
1616
- Avoid invalid retries in multiple replicas when querying [#17370](https://github.com/opensearch-project/OpenSearch/pull/17370)
17+
- Making snapshot store/restore rate dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
1718

1819
### Dependencies
1920
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.0 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923))

server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232

3333
package org.opensearch.repositories;
3434

35+
import org.apache.lucene.store.RateLimiter;
3536
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
37+
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
3638
import org.opensearch.cluster.metadata.RepositoryMetadata;
3739
import org.opensearch.common.settings.Settings;
3840
import org.opensearch.plugins.Plugin;
@@ -42,11 +44,15 @@
4244
import org.opensearch.test.OpenSearchIntegTestCase;
4345
import org.opensearch.transport.client.Client;
4446

47+
import java.nio.file.Path;
4548
import java.util.Collection;
4649
import java.util.Collections;
50+
import java.util.concurrent.ExecutionException;
4751

4852
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
4953
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
54+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_BYTES_PER_SEC;
55+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_BYTES_PER_SEC;
5056
import static org.hamcrest.Matchers.containsString;
5157
import static org.hamcrest.Matchers.equalTo;
5258
import static org.hamcrest.Matchers.hasSize;
@@ -188,4 +194,86 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
188194

189195
thread.join();
190196
}
197+
198+
public void testAdjustBytesPerSecSettingForSnapAndRestore() throws ExecutionException, InterruptedException {
199+
final InternalTestCluster cluster = internalCluster();
200+
final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
201+
.iterator()
202+
.next();
203+
204+
// create index
205+
String indexName = "test-index";
206+
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 0).put(SETTING_NUMBER_OF_SHARDS, 1).build());
207+
index(indexName, "_doc", "1", Collections.singletonMap("user", generateRandomStringArray(1, 1 << 19, false, false)));
208+
flush(indexName);
209+
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(indexName);
210+
long indexSize = client().admin()
211+
.indices()
212+
.stats(indicesStatsRequest)
213+
.get()
214+
.getIndex(indexName)
215+
.getPrimaries()
216+
.getStore()
217+
.getSize()
218+
.getBytes();
219+
220+
// create repository
221+
final String repositoryName = "test-repo";
222+
long rateBytes = indexSize / 20;
223+
Path path = randomRepoPath();
224+
Settings.Builder repoSettings = Settings.builder()
225+
.put("location", path)
226+
.put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b"))
227+
.put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b"));
228+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
229+
client().admin().cluster(),
230+
repositoryName,
231+
FsRepository.TYPE,
232+
true,
233+
repoSettings
234+
);
235+
236+
FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName);
237+
RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter();
238+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
239+
RateLimiter restoreRateLimiter = repository.restoreRateLimiter();
240+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
241+
242+
// adjust the store and restore rate of snapshot
243+
rateBytes = indexSize / 30;
244+
repoSettings = Settings.builder()
245+
.put("location", path)
246+
.put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b"))
247+
.put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b"));
248+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
249+
client().admin().cluster(),
250+
repositoryName,
251+
FsRepository.TYPE,
252+
true,
253+
repoSettings
254+
);
255+
repository = (FsRepository) repositoriesService.repository(repositoryName);
256+
snapshotRateLimiter = repository.snapshotRateLimiter();
257+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
258+
restoreRateLimiter = repository.restoreRateLimiter();
259+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
260+
261+
// only adjust the store and restore rate of snapshot, without path
262+
rateBytes = indexSize / 40;
263+
repoSettings = Settings.builder()
264+
.put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b"))
265+
.put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b"));
266+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
267+
client().admin().cluster(),
268+
repositoryName,
269+
FsRepository.TYPE,
270+
true,
271+
repoSettings
272+
);
273+
repository = (FsRepository) repositoriesService.repository(repositoryName);
274+
snapshotRateLimiter = repository.snapshotRateLimiter();
275+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
276+
restoreRateLimiter = repository.restoreRateLimiter();
277+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
278+
}
191279
}

server/src/main/java/org/opensearch/repositories/RepositoriesService.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,18 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
179179
request.settings(),
180180
CryptoMetadata.fromRequest(request.cryptoSettings())
181181
);
182+
183+
Repository currentRepository = repositories.get(request.name());
184+
boolean isReloadableSettings = currentRepository != null && currentRepository.isReloadableSettings(newRepositoryMetadata);
185+
182186
validate(request.name());
183187
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings(), repositories, settings, this);
184188
if (newRepositoryMetadata.cryptoMetadata() != null) {
185189
validate(newRepositoryMetadata.cryptoMetadata().keyProviderName());
186190
}
187191

188192
final ActionListener<ClusterStateUpdateResponse> registrationListener;
189-
if (request.verify()) {
193+
if (isReloadableSettings == false && request.verify()) {
190194
registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> {
191195
if (clusterStateUpdateResponse.isAcknowledged()) {
192196
// The response was acknowledged - all nodes should know about the new repository, let's verify them
@@ -207,7 +211,9 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
207211

208212
// Trying to create the new repository on cluster-manager to make sure it works
209213
try {
210-
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
214+
if (isReloadableSettings == false) {
215+
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
216+
}
211217
} catch (Exception e) {
212218
registrationListener.onFailure(e);
213219
return;
@@ -223,7 +229,9 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
223229

224230
@Override
225231
public ClusterState execute(ClusterState currentState) {
226-
ensureRepositoryNotInUse(currentState, request.name());
232+
if (isReloadableSettings == false) {
233+
ensureRepositoryNotInUse(currentState, request.name());
234+
}
227235
Metadata metadata = currentState.metadata();
228236
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
229237
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
@@ -480,7 +488,8 @@ public void applyClusterState(ClusterChangedEvent event) {
480488
if (previousMetadata.type().equals(repositoryMetadata.type()) == false
481489
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
482490
// Previous version is different from the version in settings
483-
if (repository.isSystemRepository() && repository.isReloadable()) {
491+
if ((repository.isSystemRepository() && repository.isReloadable())
492+
|| repository.isReloadableSettings(repositoryMetadata)) {
484493
logger.debug(
485494
"updating repository [{}] in-place to use new metadata [{}]",
486495
repositoryMetadata.name(),

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,10 @@ default boolean isReloadable() {
602602
return false;
603603
}
604604

605+
default boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) {
606+
return false;
607+
}
608+
605609
/**
606610
* Reload the repository inplace
607611
*/

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
264264

265265
public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold();
266266

267+
public static final String MAX_SNAPSHOT_BYTES_BYTES_PER_SEC = "max_snapshot_bytes_per_sec";
268+
269+
public static final String MAX_RESTORE_BYTES_BYTES_PER_SEC = "max_restore_bytes_per_sec";
270+
267271
/**
268272
* Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions.
269273
* This ensures compatibility across various JDK versions. For a practical usage example,
@@ -592,8 +596,8 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
592596
this.metadata = repositoryMetadata;
593597

594598
supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
595-
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
596-
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
599+
snapshotRateLimiter = getRateLimiter(metadata.settings(), MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, new ByteSizeValue(40, ByteSizeUnit.MB));
600+
restoreRateLimiter = getRateLimiter(metadata.settings(), MAX_RESTORE_BYTES_BYTES_PER_SEC, ByteSizeValue.ZERO);
597601
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
598602
remoteUploadLowPriorityRateLimiter = getRateLimiter(
599603
metadata.settings(),
@@ -4326,6 +4330,16 @@ public InputStream maybeRateLimitSnapshots(InputStream stream) {
43264330
return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT);
43274331
}
43284332

4333+
// Visible for testing
4334+
public RateLimiter snapshotRateLimiter() {
4335+
return snapshotRateLimiter;
4336+
}
4337+
4338+
// Visible for testing
4339+
public RateLimiter restoreRateLimiter() {
4340+
return restoreRateLimiter;
4341+
}
4342+
43294343
@Override
43304344
public List<Setting<?>> getRestrictedSystemRepositorySettings() {
43314345
return Arrays.asList(SYSTEM_REPOSITORY_SETTING, READONLY_SETTING, REMOTE_STORE_INDEX_SHALLOW_COPY);
@@ -4715,6 +4729,25 @@ private static Optional<String> extractShallowSnapshotUUID(String blobName) {
47154729
return Optional.empty();
47164730
}
47174731

4732+
public boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) {
4733+
if (metadata.name().equals(newRepositoryMetadata.name()) == false
4734+
|| metadata.type().equals(newRepositoryMetadata.type()) == false
4735+
|| Objects.equals(metadata.cryptoMetadata(), newRepositoryMetadata.cryptoMetadata()) == false) {
4736+
return false;
4737+
}
4738+
Settings newSettings = newRepositoryMetadata.settings();
4739+
Settings currentSettings = metadata.settings();
4740+
for (String key : newSettings.keySet()) {
4741+
if (key.equals(MAX_RESTORE_BYTES_BYTES_PER_SEC) || key.equals(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC)) {
4742+
continue;
4743+
}
4744+
if (currentSettings.hasValue(key) == false || currentSettings.get(key).equals(newSettings.get(key)) == false) {
4745+
return false;
4746+
}
4747+
}
4748+
return true;
4749+
}
4750+
47184751
/**
47194752
* The result of removing a snapshot from a shard folder in the repository.
47204753
*/

0 commit comments

Comments
 (0)