8
8
9
9
package org .opensearch .gateway .remote ;
10
10
11
+ import org .opensearch .action .admin .cluster .node .stats .NodesStatsRequest ;
12
+ import org .opensearch .action .admin .cluster .node .stats .NodesStatsResponse ;
11
13
import org .opensearch .action .admin .cluster .settings .ClusterUpdateSettingsResponse ;
14
+ import org .opensearch .cluster .coordination .PersistedStateStats ;
15
+ import org .opensearch .cluster .routing .IndexRoutingTable ;
12
16
import org .opensearch .common .blobstore .BlobPath ;
13
17
import org .opensearch .common .settings .Settings ;
18
+ import org .opensearch .discovery .DiscoveryStats ;
19
+ import org .opensearch .gateway .remote .model .RemoteRoutingTableBlobStore ;
20
+ import org .opensearch .index .remote .RemoteStoreEnums ;
21
+ import org .opensearch .index .remote .RemoteStorePathStrategy ;
14
22
import org .opensearch .remotestore .RemoteStoreBaseIntegTestCase ;
15
23
import org .opensearch .repositories .RepositoriesService ;
16
24
import org .opensearch .repositories .blobstore .BlobStoreRepository ;
17
25
import org .opensearch .test .OpenSearchIntegTestCase ;
18
26
import org .junit .Before ;
19
27
20
28
import java .nio .charset .StandardCharsets ;
29
+ import java .nio .file .Path ;
30
+ import java .util .ArrayList ;
21
31
import java .util .Base64 ;
32
+ import java .util .List ;
22
33
import java .util .Map ;
23
34
import java .util .concurrent .TimeUnit ;
35
+ import java .util .concurrent .atomic .AtomicLong ;
24
36
37
+ import static org .opensearch .common .util .FeatureFlags .REMOTE_PUBLICATION_EXPERIMENTAL ;
25
38
import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT ;
26
39
import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING ;
27
40
import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .RETAINED_MANIFESTS ;
28
41
import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .SKIP_CLEANUP_STATE_CHANGES ;
29
42
import static org .opensearch .gateway .remote .RemoteClusterStateService .REMOTE_CLUSTER_STATE_ENABLED_SETTING ;
43
+ import static org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable .INDEX_ROUTING_TABLE ;
30
44
import static org .opensearch .indices .IndicesService .CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING ;
45
+ import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY ;
31
46
32
47
@ OpenSearchIntegTestCase .ClusterScope (scope = OpenSearchIntegTestCase .Scope .TEST , numDataNodes = 0 )
33
48
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
34
49
35
50
private static final String INDEX_NAME = "test-index" ;
51
+ private final RemoteStoreEnums .PathType pathType = RemoteStoreEnums .PathType .HASHED_PREFIX ;
36
52
37
53
@ Before
38
54
public void setup () {
@@ -52,6 +68,11 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
52
68
return indexStats ;
53
69
}
54
70
71
+ private void initialTestSetup (int shardCount , int replicaCount , int dataNodeCount , int clusterManagerNodeCount , Settings settings ) {
72
+ prepareCluster (clusterManagerNodeCount , dataNodeCount , INDEX_NAME , replicaCount , shardCount , settings );
73
+ ensureGreen (INDEX_NAME );
74
+ }
75
+
55
76
public void testRemoteCleanupTaskUpdated () {
56
77
int shardCount = randomIntBetween (1 , 2 );
57
78
int replicaCount = 1 ;
@@ -144,6 +165,102 @@ public void testRemoteCleanupDeleteStale() throws Exception {
144
165
assertTrue (response .isAcknowledged ());
145
166
}
146
167
168
+ public void testRemoteCleanupDeleteStaleIndexRoutingFiles () throws Exception {
169
+ clusterSettingsSuppliedByTest = true ;
170
+ Path segmentRepoPath = randomRepoPath ();
171
+ Path translogRepoPath = randomRepoPath ();
172
+ Path remoteRoutingTableRepoPath = randomRepoPath ();
173
+ Settings .Builder settingsBuilder = Settings .builder ();
174
+ settingsBuilder .put (
175
+ buildRemoteStoreNodeAttributes (
176
+ REPOSITORY_NAME ,
177
+ segmentRepoPath ,
178
+ REPOSITORY_2_NAME ,
179
+ translogRepoPath ,
180
+ REMOTE_ROUTING_TABLE_REPO ,
181
+ remoteRoutingTableRepoPath ,
182
+ false
183
+ )
184
+ );
185
+ settingsBuilder .put (
186
+ RemoteRoutingTableBlobStore .REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING .getKey (),
187
+ RemoteStoreEnums .PathType .HASHED_PREFIX .toString ()
188
+ )
189
+ .put ("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY , REMOTE_ROUTING_TABLE_REPO )
190
+ .put (REMOTE_PUBLICATION_EXPERIMENTAL , true );
191
+
192
+ int shardCount = randomIntBetween (1 , 2 );
193
+ int replicaCount = 1 ;
194
+ int dataNodeCount = shardCount * (replicaCount + 1 );
195
+ int clusterManagerNodeCount = 1 ;
196
+ initialTestSetup (shardCount , replicaCount , dataNodeCount , clusterManagerNodeCount , settingsBuilder .build ());
197
+
198
+ // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
199
+ // to repository, if manifest files are less than that it means clean up has run
200
+ updateClusterStateNTimes (RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1 );
201
+
202
+ RepositoriesService repositoriesService = internalCluster ().getClusterManagerNodeInstance (RepositoriesService .class );
203
+ BlobStoreRepository repository = (BlobStoreRepository ) repositoriesService .repository (REPOSITORY_NAME );
204
+ BlobPath baseMetadataPath = getBaseMetadataPath (repository );
205
+
206
+ BlobStoreRepository routingTableRepository = (BlobStoreRepository ) repositoriesService .repository (REMOTE_ROUTING_TABLE_REPO );
207
+ List <IndexRoutingTable > indexRoutingTables = new ArrayList <>(getClusterState ().routingTable ().indicesRouting ().values ());
208
+ BlobPath indexRoutingPath = getIndexRoutingPath (baseMetadataPath , indexRoutingTables .get (0 ).getIndex ().getUUID ());
209
+ assertBusy (() -> {
210
+ // There would be >=3 files as shards will transition from UNASSIGNED -> INIT -> STARTED state
211
+ assertTrue (routingTableRepository .blobStore ().blobContainer (indexRoutingPath ).listBlobs ().size () >= 3 );
212
+ });
213
+
214
+ RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster ().getClusterManagerNodeInstance (
215
+ RemoteClusterStateCleanupManager .class
216
+ );
217
+
218
+ // set cleanup interval to 100 ms to make the test faster
219
+ ClusterUpdateSettingsResponse response = client ().admin ()
220
+ .cluster ()
221
+ .prepareUpdateSettings ()
222
+ .setPersistentSettings (Settings .builder ().put (REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING .getKey (), "100ms" ))
223
+ .get ();
224
+
225
+ assertTrue (response .isAcknowledged ());
226
+ assertBusy (() -> assertEquals (100 , remoteClusterStateCleanupManager .getStaleFileDeletionTask ().getInterval ().getMillis ()));
227
+
228
+ String clusterManagerNode = internalCluster ().getClusterManagerName ();
229
+ NodesStatsResponse nodesStatsResponse = client ().admin ()
230
+ .cluster ()
231
+ .prepareNodesStats (clusterManagerNode )
232
+ .addMetric (NodesStatsRequest .Metric .DISCOVERY .metricName ())
233
+ .get ();
234
+ verifyIndexRoutingFilesDeletion (routingTableRepository , indexRoutingPath , nodesStatsResponse );
235
+
236
+ // disable the clean up to avoid race condition during shutdown
237
+ response = client ().admin ()
238
+ .cluster ()
239
+ .prepareUpdateSettings ()
240
+ .setPersistentSettings (Settings .builder ().put (REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING .getKey (), "-1" ))
241
+ .get ();
242
+ assertTrue (response .isAcknowledged ());
243
+ }
244
+
245
+ private void verifyIndexRoutingFilesDeletion (
246
+ BlobStoreRepository routingTableRepository ,
247
+ BlobPath indexRoutingPath ,
248
+ NodesStatsResponse nodesStatsResponse
249
+ ) throws Exception {
250
+ assertBusy (() -> { assertEquals (1 , routingTableRepository .blobStore ().blobContainer (indexRoutingPath ).listBlobs ().size ()); });
251
+
252
+ // Verify index routing files delete stats
253
+ DiscoveryStats discoveryStats = nodesStatsResponse .getNodes ().get (0 ).getDiscoveryStats ();
254
+ assertNotNull (discoveryStats .getClusterStateStats ());
255
+ for (PersistedStateStats persistedStateStats : discoveryStats .getClusterStateStats ().getPersistenceStats ()) {
256
+ Map <String , AtomicLong > extendedFields = persistedStateStats .getExtendedFields ();
257
+ assertTrue (extendedFields .containsKey (RemotePersistenceStats .INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT ));
258
+ long cleanupAttemptFailedCount = extendedFields .get (RemotePersistenceStats .INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT )
259
+ .get ();
260
+ assertEquals (0 , cleanupAttemptFailedCount );
261
+ }
262
+ }
263
+
147
264
private void updateClusterStateNTimes (int n ) {
148
265
int newReplicaCount = randomIntBetween (0 , 3 );
149
266
for (int i = n ; i > 0 ; i --) {
@@ -155,4 +272,25 @@ private void updateClusterStateNTimes(int n) {
155
272
assertTrue (response .isAcknowledged ());
156
273
}
157
274
}
275
+
276
+ private BlobPath getBaseMetadataPath (BlobStoreRepository repository ) {
277
+ return repository .basePath ()
278
+ .add (
279
+ Base64 .getUrlEncoder ()
280
+ .withoutPadding ()
281
+ .encodeToString (getClusterState ().getClusterName ().value ().getBytes (StandardCharsets .UTF_8 ))
282
+ )
283
+ .add ("cluster-state" )
284
+ .add (getClusterState ().metadata ().clusterUUID ());
285
+ }
286
+
287
+ private BlobPath getIndexRoutingPath (BlobPath baseMetadataPath , String indexUUID ) {
288
+ return pathType .path (
289
+ RemoteStorePathStrategy .BasePathInput .builder ()
290
+ .basePath (baseMetadataPath .add (INDEX_ROUTING_TABLE ))
291
+ .indexUUID (indexUUID )
292
+ .build (),
293
+ RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64
294
+ );
295
+ }
158
296
}
0 commit comments