Skip to content

Commit bd1f43c

Browse files
committed
Making snapshot store/restore rate dynamic
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]>
1 parent d723db8 commit bd1f43c

File tree

7 files changed

+286
-41
lines changed

7 files changed

+286
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131

3232
### Changed
3333
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
34+
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
3435

3536
### Dependencies
3637
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))

plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void testSimpleWorkflow() {
160160

161161
public void testMissingUri() {
162162
try {
163-
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", Settings.builder());
163+
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo1", "hdfs", Settings.builder());
164164
fail();
165165
} catch (RepositoryException e) {
166166
assertTrue(e.getCause() instanceof IllegalArgumentException);
@@ -193,7 +193,7 @@ public void testNonHdfsUri() {
193193
public void testPathSpecifiedInHdfs() {
194194
try {
195195
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///some/path");
196-
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
196+
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo2", "hdfs", settings);
197197
fail();
198198
} catch (RepositoryException e) {
199199
assertTrue(e.getCause() instanceof IllegalArgumentException);
@@ -204,7 +204,7 @@ public void testPathSpecifiedInHdfs() {
204204
public void testMissingPath() {
205205
try {
206206
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///");
207-
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
207+
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo3", "hdfs", settings);
208208
fail();
209209
} catch (RepositoryException e) {
210210
assertTrue(e.getCause() instanceof IllegalArgumentException);

server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java

Lines changed: 129 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.repositories;
3434

35+
import org.apache.lucene.store.RateLimiter;
3536
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
3637
import org.opensearch.cluster.metadata.RepositoryMetadata;
3738
import org.opensearch.common.settings.Settings;
@@ -42,12 +43,18 @@
4243
import org.opensearch.test.OpenSearchIntegTestCase;
4344
import org.opensearch.transport.client.Client;
4445

46+
import java.nio.file.Path;
4547
import java.util.Collection;
4648
import java.util.Collections;
4749
import java.util.concurrent.atomic.AtomicInteger;
4850

4951
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
5052
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
53+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC;
54+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC;
55+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_UPLOAD_BYTES_PER_SEC;
56+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC;
57+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC;
5158
import static org.hamcrest.Matchers.containsString;
5259
import static org.hamcrest.Matchers.equalTo;
5360
import static org.hamcrest.Matchers.hasSize;
@@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
138145

139146
// create repository
140147
final String repositoryName = "test-repo";
148+
Path path = randomRepoPath();
141149
Settings.Builder repoSettings = Settings.builder()
142-
.put("location", randomRepoPath())
143-
.put("max_snapshot_bytes_per_sec", "10mb")
144-
.put("max_restore_bytes_per_sec", "10mb");
150+
.put("location", path)
151+
.put(MAX_SNAPSHOT_BYTES_PER_SEC, "10mb")
152+
.put(MAX_RESTORE_BYTES_PER_SEC, "10mb");
145153
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
146154
client().admin().cluster(),
147155
repositoryName,
@@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
176184

177185
try {
178186
logger.info("--> begin to reset repository");
179-
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
187+
repoSettings = Settings.builder().put("location", randomRepoPath()).put(MAX_SNAPSHOT_BYTES_PER_SEC, "300mb");
180188
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
181189
client().admin().cluster(),
182190
repositoryName,
@@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
194202

195203
thread.join();
196204
}
205+
206+
public void testAdjustBytesPerSecSettingForSnapAndRestore() {
207+
final InternalTestCluster cluster = internalCluster();
208+
final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
209+
.iterator()
210+
.next();
211+
212+
// create repository
213+
final String repositoryName = "test-repo1";
214+
long rateBytes = 200000;
215+
Path path = randomRepoPath();
216+
Settings.Builder repoSettings = Settings.builder()
217+
.put("location", path)
218+
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
219+
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
220+
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
221+
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
222+
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
223+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
224+
client().admin().cluster(),
225+
repositoryName,
226+
FsRepository.TYPE,
227+
true,
228+
repoSettings
229+
);
230+
231+
FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName);
232+
RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter();
233+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
234+
RateLimiter restoreRateLimiter = repository.restoreRateLimiter();
235+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
236+
RateLimiter remoteUploadRateLimiter = repository.remoteUploadRateLimiter();
237+
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
238+
RateLimiter remoteUploadLowPriorityRateLimiter = repository.remoteUploadLowPriorityRateLimiter();
239+
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
240+
RateLimiter remoteDownloadRateLimiter = repository.remoteDownloadRateLimiter();
241+
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
242+
243+
// adjust all the reloadable settings
244+
{
245+
rateBytes = rateBytes / 2;
246+
repoSettings = Settings.builder()
247+
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
248+
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
249+
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
250+
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
251+
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
252+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
253+
client().admin().cluster(),
254+
repositoryName,
255+
FsRepository.TYPE,
256+
true,
257+
repoSettings
258+
);
259+
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
260+
assertThat(newRepository, sameInstance(repository));
261+
snapshotRateLimiter = newRepository.snapshotRateLimiter();
262+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
263+
restoreRateLimiter = newRepository.restoreRateLimiter();
264+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
265+
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
266+
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
267+
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
268+
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
269+
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
270+
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
271+
}
272+
273+
// In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings
274+
{
275+
long newRateBytes = rateBytes / 2;
276+
repoSettings = Settings.builder()
277+
.put("location", path)
278+
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (newRateBytes + "b"))
279+
.put(MAX_RESTORE_BYTES_PER_SEC, (newRateBytes + "b"));
280+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
281+
client().admin().cluster(),
282+
repositoryName,
283+
FsRepository.TYPE,
284+
true,
285+
repoSettings
286+
);
287+
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
288+
assertThat(newRepository, sameInstance(repository));
289+
snapshotRateLimiter = newRepository.snapshotRateLimiter();
290+
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
291+
restoreRateLimiter = newRepository.restoreRateLimiter();
292+
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
293+
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
294+
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
295+
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
296+
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
297+
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
298+
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
299+
}
300+
301+
// the new settings are not all equal to the old settings, so the repository will be not reloaded
302+
{
303+
rateBytes = rateBytes / 2;
304+
repoSettings = Settings.builder()
305+
.put("location", path)
306+
.put("io_buffer_size", "8mb")
307+
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
308+
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
309+
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
310+
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
311+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
312+
client().admin().cluster(),
313+
repositoryName,
314+
FsRepository.TYPE,
315+
true,
316+
repoSettings
317+
);
318+
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
319+
assertNotEquals(newRepository, repository);
320+
}
321+
}
197322
}

server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
5151
import org.opensearch.core.action.ActionListener;
5252
import org.opensearch.core.common.Strings;
53+
import org.opensearch.core.common.unit.ByteSizeUnit;
5354
import org.opensearch.core.rest.RestStatus;
5455
import org.opensearch.discovery.AbstractDisruptionTestCase;
5556
import org.opensearch.plugins.Plugin;
@@ -154,6 +155,7 @@ public void testSettingsUpdateFailWhenCreateSnapshotInProgress() throws Exceptio
154155
Thread.sleep(1000); // Wait for the snapshot to start
155156
assertFalse(createSlowFuture.isDone()); // Ensure the snapshot is still in progress
156157
// Attempt to update the repository settings while the snapshot is in progress
158+
settings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
157159
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", settings));
158160
// Verify that the update fails with an appropriate exception
159161
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
@@ -180,10 +182,9 @@ public void testSettingsUpdateFailWhenDeleteSnapshotInProgress() throws Interrup
180182
Thread.sleep(1000); // Wait for the delete operation to start
181183
assertFalse(future.isDone()); // Ensure the delete operation is still in progress
182184
// Attempt to update the repository settings while the delete operation is in progress
183-
IllegalStateException ex = assertThrows(
184-
IllegalStateException.class,
185-
() -> updateRepository(repoName, "mock", randomRepositorySettings())
186-
);
185+
Settings.Builder newSettings = randomRepositorySettings();
186+
newSettings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
187+
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", newSettings));
187188
// Verify that the update fails with an appropriate exception
188189
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
189190
unblockNode(repoName, clusterManagerName); // Unblock the delete operation

server/src/main/java/org/opensearch/repositories/RepositoriesService.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public RepositoriesService(
174174
public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
175175
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";
176176

177-
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
177+
RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
178178
request.name(),
179179
request.type(),
180180
request.settings(),
@@ -206,14 +206,32 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
206206
registrationListener = listener;
207207
}
208208

209-
// Trying to create the new repository on cluster-manager to make sure it works
210-
try {
211-
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
212-
} catch (Exception e) {
213-
registrationListener.onFailure(e);
214-
return;
209+
Repository currentRepository = repositories.get(request.name());
210+
boolean isReloadableSettings = currentRepository != null && currentRepository.isReloadableSettings(newRepositoryMetadata);
211+
212+
if (isReloadableSettings) {
213+
// We are reloading the repository, so we need to preserve the old settings in the new repository metadata
214+
Settings updatedSettings = Settings.builder()
215+
.put(currentRepository.getMetadata().settings())
216+
.put(newRepositoryMetadata.settings())
217+
.build();
218+
newRepositoryMetadata = new RepositoryMetadata(
219+
newRepositoryMetadata.name(),
220+
newRepositoryMetadata.type(),
221+
updatedSettings,
222+
newRepositoryMetadata.cryptoMetadata()
223+
);
224+
} else {
225+
// Trying to create the new repository on cluster-manager to make sure it works
226+
try {
227+
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
228+
} catch (Exception e) {
229+
registrationListener.onFailure(e);
230+
return;
231+
}
215232
}
216233

234+
final RepositoryMetadata finalRepositoryMetadata = newRepositoryMetadata;
217235
clusterService.submitStateUpdateTask(
218236
"put_repository [" + request.name() + "]",
219237
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
@@ -224,7 +242,9 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
224242

225243
@Override
226244
public ClusterState execute(ClusterState currentState) {
227-
ensureRepositoryNotInUse(currentState, request.name());
245+
if (isReloadableSettings == false) {
246+
ensureRepositoryNotInUse(currentState, request.name());
247+
}
228248
Metadata metadata = currentState.metadata();
229249
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
230250
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
@@ -245,17 +265,17 @@ public ClusterState execute(ClusterState currentState) {
245265
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
246266

247267
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
248-
RepositoryMetadata updatedRepositoryMetadata = newRepositoryMetadata;
268+
RepositoryMetadata updatedRepositoryMetadata = finalRepositoryMetadata;
249269
if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) {
250270
Settings updatedSettings = Settings.builder()
251-
.put(newRepositoryMetadata.settings())
271+
.put(finalRepositoryMetadata.settings())
252272
.put(SYSTEM_REPOSITORY_SETTING.getKey(), true)
253273
.build();
254274
updatedRepositoryMetadata = new RepositoryMetadata(
255-
newRepositoryMetadata.name(),
256-
newRepositoryMetadata.type(),
275+
finalRepositoryMetadata.name(),
276+
finalRepositoryMetadata.type(),
257277
updatedSettings,
258-
newRepositoryMetadata.cryptoMetadata()
278+
finalRepositoryMetadata.cryptoMetadata()
259279
);
260280
}
261281
if (repositoryMetadata.name().equals(updatedRepositoryMetadata.name())) {
@@ -481,7 +501,8 @@ public void applyClusterState(ClusterChangedEvent event) {
481501
if (previousMetadata.type().equals(repositoryMetadata.type()) == false
482502
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
483503
// Previous version is different from the version in settings
484-
if (repository.isSystemRepository() && repository.isReloadable()) {
504+
if ((repository.isSystemRepository() && repository.isReloadable())
505+
|| repository.isReloadableSettings(repositoryMetadata)) {
485506
logger.debug(
486507
"updating repository [{}] in-place to use new metadata [{}]",
487508
repositoryMetadata.name(),

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,10 @@ default boolean isReloadable() {
602602
return false;
603603
}
604604

605+
default boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) {
606+
return false;
607+
}
608+
605609
/**
606610
* Reload the repository inplace
607611
*/

0 commit comments

Comments
 (0)