32
32
33
33
package org .opensearch .repositories ;
34
34
35
+ import org .apache .lucene .store .RateLimiter ;
35
36
import org .opensearch .action .admin .cluster .repositories .get .GetRepositoriesResponse ;
36
37
import org .opensearch .cluster .metadata .RepositoryMetadata ;
37
38
import org .opensearch .common .settings .Settings ;
42
43
import org .opensearch .test .OpenSearchIntegTestCase ;
43
44
import org .opensearch .transport .client .Client ;
44
45
46
+ import java .nio .file .Path ;
45
47
import java .util .Collection ;
46
48
import java .util .Collections ;
47
49
import java .util .concurrent .atomic .AtomicInteger ;
48
50
49
51
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
50
52
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
53
+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC ;
54
+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC ;
55
+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_UPLOAD_BYTES_PER_SEC ;
56
+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC ;
57
+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC ;
51
58
import static org .hamcrest .Matchers .containsString ;
52
59
import static org .hamcrest .Matchers .equalTo ;
53
60
import static org .hamcrest .Matchers .hasSize ;
@@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
138
145
139
146
// create repository
140
147
final String repositoryName = "test-repo" ;
148
+ Path path = randomRepoPath ();
141
149
Settings .Builder repoSettings = Settings .builder ()
142
- .put ("location" , randomRepoPath () )
143
- .put ("max_snapshot_bytes_per_sec" , "10mb" )
144
- .put ("max_restore_bytes_per_sec" , "10mb" );
150
+ .put ("location" , path )
151
+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , "10mb" )
152
+ .put (MAX_RESTORE_BYTES_PER_SEC , "10mb" );
145
153
OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
146
154
client ().admin ().cluster (),
147
155
repositoryName ,
@@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
176
184
177
185
try {
178
186
logger .info ("--> begin to reset repository" );
179
- repoSettings = Settings .builder ().put ("location" , randomRepoPath ()).put ("max_snapshot_bytes_per_sec" , "300mb" );
187
+ repoSettings = Settings .builder ().put ("location" , randomRepoPath ()).put (MAX_SNAPSHOT_BYTES_PER_SEC , "300mb" );
180
188
OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
181
189
client ().admin ().cluster (),
182
190
repositoryName ,
@@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
194
202
195
203
thread .join ();
196
204
}
205
+
206
+ public void testAdjustBytesPerSecSettingForSnapAndRestore () {
207
+ final InternalTestCluster cluster = internalCluster ();
208
+ final RepositoriesService repositoriesService = cluster .getDataOrClusterManagerNodeInstances (RepositoriesService .class )
209
+ .iterator ()
210
+ .next ();
211
+
212
+ // create repository
213
+ final String repositoryName = "test-repo1" ;
214
+ long rateBytes = 200000 ;
215
+ Path path = randomRepoPath ();
216
+ Settings .Builder repoSettings = Settings .builder ()
217
+ .put ("location" , path )
218
+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (rateBytes + "b" ))
219
+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
220
+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
221
+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
222
+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
223
+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
224
+ client ().admin ().cluster (),
225
+ repositoryName ,
226
+ FsRepository .TYPE ,
227
+ true ,
228
+ repoSettings
229
+ );
230
+
231
+ FsRepository repository = (FsRepository ) repositoriesService .repository (repositoryName );
232
+ RateLimiter snapshotRateLimiter = repository .snapshotRateLimiter ();
233
+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
234
+ RateLimiter restoreRateLimiter = repository .restoreRateLimiter ();
235
+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
236
+ RateLimiter remoteUploadRateLimiter = repository .remoteUploadRateLimiter ();
237
+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
238
+ RateLimiter remoteUploadLowPriorityRateLimiter = repository .remoteUploadLowPriorityRateLimiter ();
239
+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
240
+ RateLimiter remoteDownloadRateLimiter = repository .remoteDownloadRateLimiter ();
241
+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
242
+
243
+ // adjust all the reloadable settings
244
+ {
245
+ rateBytes = rateBytes / 2 ;
246
+ repoSettings = Settings .builder ()
247
+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (rateBytes + "b" ))
248
+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
249
+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
250
+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
251
+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
252
+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
253
+ client ().admin ().cluster (),
254
+ repositoryName ,
255
+ FsRepository .TYPE ,
256
+ true ,
257
+ repoSettings
258
+ );
259
+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
260
+ assertThat (newRepository , sameInstance (repository ));
261
+ snapshotRateLimiter = newRepository .snapshotRateLimiter ();
262
+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
263
+ restoreRateLimiter = newRepository .restoreRateLimiter ();
264
+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
265
+ remoteUploadRateLimiter = newRepository .remoteUploadRateLimiter ();
266
+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
267
+ remoteUploadLowPriorityRateLimiter = newRepository .remoteUploadLowPriorityRateLimiter ();
268
+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
269
+ remoteDownloadRateLimiter = newRepository .remoteDownloadRateLimiter ();
270
+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
271
+ }
272
+
273
+ // In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings
274
+ {
275
+ long newRateBytes = rateBytes / 2 ;
276
+ repoSettings = Settings .builder ()
277
+ .put ("location" , path )
278
+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (newRateBytes + "b" ))
279
+ .put (MAX_RESTORE_BYTES_PER_SEC , (newRateBytes + "b" ));
280
+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
281
+ client ().admin ().cluster (),
282
+ repositoryName ,
283
+ FsRepository .TYPE ,
284
+ true ,
285
+ repoSettings
286
+ );
287
+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
288
+ assertThat (newRepository , sameInstance (repository ));
289
+ snapshotRateLimiter = newRepository .snapshotRateLimiter ();
290
+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) newRateBytes / (1024 * 1024 )));
291
+ restoreRateLimiter = newRepository .restoreRateLimiter ();
292
+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) newRateBytes / (1024 * 1024 )));
293
+ remoteUploadRateLimiter = newRepository .remoteUploadRateLimiter ();
294
+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
295
+ remoteUploadLowPriorityRateLimiter = newRepository .remoteUploadLowPriorityRateLimiter ();
296
+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
297
+ remoteDownloadRateLimiter = newRepository .remoteDownloadRateLimiter ();
298
+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
299
+ }
300
+
301
+ // the new settings are not all equal to the old settings, so the repository will be not reloaded
302
+ {
303
+ rateBytes = rateBytes / 2 ;
304
+ repoSettings = Settings .builder ()
305
+ .put ("location" , path )
306
+ .put ("io_buffer_size" , "8mb" )
307
+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
308
+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
309
+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
310
+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
311
+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
312
+ client ().admin ().cluster (),
313
+ repositoryName ,
314
+ FsRepository .TYPE ,
315
+ true ,
316
+ repoSettings
317
+ );
318
+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
319
+ assertNotEquals (newRepository , repository );
320
+ }
321
+ }
197
322
}
0 commit comments