Skip to content

Commit 4f50b4d

Browse files
authored
Schedule reroute after allocator timed out (#15565)
* Schedule reroute after allocator timed out Signed-off-by: Rishab Nahata <[email protected]>
1 parent f33c786 commit 4f50b4d

File tree

9 files changed

+321
-80
lines changed

9 files changed

+321
-80
lines changed

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.cluster.metadata.ViewMetadata;
5454
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
5555
import org.opensearch.cluster.routing.DelayedAllocationService;
56+
import org.opensearch.cluster.routing.RerouteService;
5657
import org.opensearch.cluster.routing.allocation.AllocationService;
5758
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
5859
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
@@ -479,4 +480,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
479480
allocationService.setExistingShardsAllocators(existingShardsAllocators);
480481
}
481482

483+
public void setRerouteServiceForAllocator(RerouteService rerouteService) {
484+
shardsAllocator.setRerouteService(rerouteService);
485+
}
482486
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.lucene.util.IntroSorter;
38+
import org.opensearch.cluster.routing.RerouteService;
3839
import org.opensearch.cluster.routing.RoutingNode;
3940
import org.opensearch.cluster.routing.RoutingNodes;
4041
import org.opensearch.cluster.routing.ShardMovementStrategy;
@@ -49,12 +50,14 @@
4950
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
5051
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
5152
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
53+
import org.opensearch.common.Priority;
5254
import org.opensearch.common.inject.Inject;
5355
import org.opensearch.common.settings.ClusterSettings;
5456
import org.opensearch.common.settings.Setting;
5557
import org.opensearch.common.settings.Setting.Property;
5658
import org.opensearch.common.settings.Settings;
5759
import org.opensearch.common.unit.TimeValue;
60+
import org.opensearch.core.action.ActionListener;
5861

5962
import java.util.HashMap;
6063
import java.util.HashSet;
@@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
202205
private volatile boolean ignoreThrottleInRestore;
203206
private volatile TimeValue allocatorTimeout;
204207
private long startTime;
208+
private RerouteService rerouteService;
205209

206210
public BalancedShardsAllocator(Settings settings) {
207211
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@@ -231,6 +235,12 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
231235
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
232236
}
233237

238+
@Override
239+
public void setRerouteService(RerouteService rerouteService) {
240+
assert this.rerouteService == null : "RerouteService is already set";
241+
this.rerouteService = rerouteService;
242+
}
243+
234244
/**
235245
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
236246
*/
@@ -342,6 +352,7 @@ public void allocate(RoutingAllocation allocation) {
342352
localShardsBalancer.allocateUnassigned();
343353
localShardsBalancer.moveShards();
344354
localShardsBalancer.balance();
355+
scheduleRerouteIfAllocatorTimedOut();
345356

346357
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
347358
remoteShardsBalancer.allocateUnassigned();
@@ -404,6 +415,20 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
404415
}
405416
}
406417

418+
private void scheduleRerouteIfAllocatorTimedOut() {
419+
if (allocatorTimedOut()) {
420+
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
421+
rerouteService.reroute(
422+
"reroute after balanced shards allocator timed out",
423+
Priority.HIGH,
424+
ActionListener.wrap(
425+
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
426+
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
427+
)
428+
);
429+
}
430+
}
431+
407432
/**
408433
* Returns the currently configured delta threshold
409434
*/

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.routing.allocation.allocator;
3434

35+
import org.opensearch.cluster.routing.RerouteService;
3536
import org.opensearch.cluster.routing.ShardRouting;
3637
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
3738
import org.opensearch.cluster.routing.allocation.MoveDecision;
@@ -73,4 +74,6 @@ public interface ShardsAllocator {
7374
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
7475
*/
7576
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);
77+
78+
default void setRerouteService(RerouteService rerouteService) {}
7679
}

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ public void cleanCaches() {
184184

185185
// for tests
186186
protected ShardsBatchGatewayAllocator() {
187-
this(DEFAULT_SHARD_BATCH_SIZE);
187+
this(DEFAULT_SHARD_BATCH_SIZE, null);
188188
}
189189

190-
protected ShardsBatchGatewayAllocator(long batchSize) {
191-
this.rerouteService = null;
190+
protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) {
191+
this.rerouteService = rerouteService;
192192
this.batchStartedAction = null;
193193
this.primaryShardBatchAllocator = null;
194194
this.batchStoreAction = null;
@@ -297,6 +297,18 @@ public void run() {
297297
public void onComplete() {
298298
logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size());
299299
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
300+
if (timedOutPrimaryShardIds.isEmpty() == false) {
301+
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
302+
assert rerouteService != null;
303+
rerouteService.reroute(
304+
"reroute after existing shards allocator timed out",
305+
Priority.HIGH,
306+
ActionListener.wrap(
307+
r -> logger.trace("reroute after existing shards allocator timed out completed"),
308+
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
309+
)
310+
);
311+
}
300312
}
301313
};
302314
} else {
@@ -320,6 +332,18 @@ public void run() {
320332
public void onComplete() {
321333
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
322334
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
335+
if (timedOutReplicaShardIds.isEmpty() == false) {
336+
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
337+
assert rerouteService != null;
338+
rerouteService.reroute(
339+
"reroute after existing shards allocator timed out",
340+
Priority.HIGH,
341+
ActionListener.wrap(
342+
r -> logger.trace("reroute after existing shards allocator timed out completed"),
343+
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
344+
)
345+
);
346+
}
323347
}
324348
};
325349
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ protected Node(
871871
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
872872
rerouteServiceReference.set(rerouteService);
873873
clusterService.setRerouteService(rerouteService);
874+
clusterModule.setRerouteServiceForAllocator(rerouteService);
874875

875876
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
876877

server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,19 @@ public void testQueryGroupMetadataRegister() {
337337
);
338338
}
339339

340+
public void testRerouteServiceSetForBalancedShardsAllocator() {
341+
ClusterModule clusterModule = new ClusterModule(
342+
Settings.EMPTY,
343+
clusterService,
344+
Collections.emptyList(),
345+
clusterInfoService,
346+
null,
347+
threadContext,
348+
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
349+
);
350+
clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state()));
351+
}
352+
340353
private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) {
341354
return new ClusterPlugin() {
342355
@Override

0 commit comments

Comments
 (0)