|
32 | 32 |
|
33 | 33 | package org.opensearch.repositories;
|
34 | 34 |
|
| 35 | +import org.apache.lucene.store.RateLimiter; |
35 | 36 | import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
|
| 37 | +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; |
36 | 38 | import org.opensearch.cluster.metadata.RepositoryMetadata;
|
37 | 39 | import org.opensearch.common.settings.Settings;
|
38 | 40 | import org.opensearch.plugins.Plugin;
|
|
42 | 44 | import org.opensearch.test.OpenSearchIntegTestCase;
|
43 | 45 | import org.opensearch.transport.client.Client;
|
44 | 46 |
|
| 47 | +import java.nio.file.Path; |
45 | 48 | import java.util.Collection;
|
46 | 49 | import java.util.Collections;
|
| 50 | +import java.util.concurrent.ExecutionException; |
47 | 51 |
|
48 | 52 | import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
49 | 53 | import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
| 54 | +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_BYTES_PER_SEC; |
| 55 | +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_BYTES_PER_SEC; |
50 | 56 | import static org.hamcrest.Matchers.containsString;
|
51 | 57 | import static org.hamcrest.Matchers.equalTo;
|
52 | 58 | import static org.hamcrest.Matchers.hasSize;
|
@@ -188,4 +194,86 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
|
188 | 194 |
|
189 | 195 | thread.join();
|
190 | 196 | }
|
| 197 | + |
| 198 | + public void testAdjustBytesPerSecSettingForSnapAndRestore() throws ExecutionException, InterruptedException { |
| 199 | + final InternalTestCluster cluster = internalCluster(); |
| 200 | + final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class) |
| 201 | + .iterator() |
| 202 | + .next(); |
| 203 | + |
| 204 | + // create index |
| 205 | + String indexName = "test-index"; |
| 206 | + createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 0).put(SETTING_NUMBER_OF_SHARDS, 1).build()); |
| 207 | + index(indexName, "_doc", "1", Collections.singletonMap("user", generateRandomStringArray(1, 1 << 19, false, false))); |
| 208 | + flush(indexName); |
| 209 | + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(indexName); |
| 210 | + long indexSize = client().admin() |
| 211 | + .indices() |
| 212 | + .stats(indicesStatsRequest) |
| 213 | + .get() |
| 214 | + .getIndex(indexName) |
| 215 | + .getPrimaries() |
| 216 | + .getStore() |
| 217 | + .getSize() |
| 218 | + .getBytes(); |
| 219 | + |
| 220 | + // create repository |
| 221 | + final String repositoryName = "test-repo"; |
| 222 | + long rateBytes = indexSize / 20; |
| 223 | + Path path = randomRepoPath(); |
| 224 | + Settings.Builder repoSettings = Settings.builder() |
| 225 | + .put("location", path) |
| 226 | + .put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b")) |
| 227 | + .put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b")); |
| 228 | + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( |
| 229 | + client().admin().cluster(), |
| 230 | + repositoryName, |
| 231 | + FsRepository.TYPE, |
| 232 | + true, |
| 233 | + repoSettings |
| 234 | + ); |
| 235 | + |
| 236 | + FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName); |
| 237 | + RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter(); |
| 238 | + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 239 | + RateLimiter restoreRateLimiter = repository.restoreRateLimiter(); |
| 240 | + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 241 | + |
| 242 | + // adjust the store and restore rate of snapshot |
| 243 | + rateBytes = indexSize / 30; |
| 244 | + repoSettings = Settings.builder() |
| 245 | + .put("location", path) |
| 246 | + .put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b")) |
| 247 | + .put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b")); |
| 248 | + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( |
| 249 | + client().admin().cluster(), |
| 250 | + repositoryName, |
| 251 | + FsRepository.TYPE, |
| 252 | + true, |
| 253 | + repoSettings |
| 254 | + ); |
| 255 | + repository = (FsRepository) repositoriesService.repository(repositoryName); |
| 256 | + snapshotRateLimiter = repository.snapshotRateLimiter(); |
| 257 | + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 258 | + restoreRateLimiter = repository.restoreRateLimiter(); |
| 259 | + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 260 | + |
| 261 | + // only adjust the store and restore rate of snapshot, without path |
| 262 | + rateBytes = indexSize / 40; |
| 263 | + repoSettings = Settings.builder() |
| 264 | + .put(MAX_SNAPSHOT_BYTES_BYTES_PER_SEC, (rateBytes + "b")) |
| 265 | + .put(MAX_RESTORE_BYTES_BYTES_PER_SEC, (rateBytes + "b")); |
| 266 | + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( |
| 267 | + client().admin().cluster(), |
| 268 | + repositoryName, |
| 269 | + FsRepository.TYPE, |
| 270 | + true, |
| 271 | + repoSettings |
| 272 | + ); |
| 273 | + repository = (FsRepository) repositoriesService.repository(repositoryName); |
| 274 | + snapshotRateLimiter = repository.snapshotRateLimiter(); |
| 275 | + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 276 | + restoreRateLimiter = repository.restoreRateLimiter(); |
| 277 | + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); |
| 278 | + } |
191 | 279 | }
|
0 commit comments