Skip to content

Commit 41258e7

Browse files
Improve the rejection logic for soft mode query groups during node duress (#16417) (#16438)
* improve the rejection logic for wlm * add CHANGELOG --------- Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 3312eda commit 41258e7

File tree

3 files changed

+72
-17
lines changed

3 files changed

+72
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9393
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
9494
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
9595
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
96+
- [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417))
9697

9798
### Security
9899

server/src/main/java/org/opensearch/wlm/QueryGroupService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,12 @@ public void rejectIfNeeded(String queryGroupId) {
266266
return;
267267
}
268268

269-
// rejections will not happen for SOFT mode QueryGroups
269+
// rejections will not happen for SOFT mode QueryGroups unless node is in duress
270270
Optional<QueryGroup> optionalQueryGroup = activeQueryGroups.stream().filter(x -> x.get_id().equals(queryGroupId)).findFirst();
271271

272-
if (optionalQueryGroup.isPresent() && optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT)
273-
return;
272+
if (optionalQueryGroup.isPresent()
273+
&& (optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT
274+
&& !nodeDuressTrackers.isNodeInDuress())) return;
274275

275276
optionalQueryGroup.ifPresent(queryGroup -> {
276277
boolean reject = false;

server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.mockito.Mockito.when;
4949

5050
public class QueryGroupServiceTests extends OpenSearchTestCase {
51+
public static final String QUERY_GROUP_ID = "queryGroupId1";
5152
private QueryGroupService queryGroupService;
5253
private QueryGroupTaskCancellationService mockCancellationService;
5354
private ClusterService mockClusterService;
@@ -68,6 +69,7 @@ public void setUp() throws Exception {
6869
mockNodeDuressTrackers = Mockito.mock(NodeDuressTrackers.class);
6970
mockCancellationService = Mockito.mock(TestQueryGroupCancellationService.class);
7071
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor();
72+
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(false);
7173

7274
queryGroupService = new QueryGroupService(
7375
mockCancellationService,
@@ -203,26 +205,52 @@ public void testRejectIfNeeded_whenQueryGroupIdIsNullOrDefaultOne() {
203205
verify(spyMap, never()).get(any());
204206
}
205207

208+
public void testRejectIfNeeded_whenSoftModeQueryGroupIsContendedAndNodeInDuress() {
209+
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
210+
"testQueryGroup",
211+
QUERY_GROUP_ID,
212+
MutableQueryGroupFragment.ResiliencyMode.SOFT,
213+
Map.of(ResourceType.CPU, 0.10)
214+
);
215+
mockQueryGroupStateMap = new HashMap<>();
216+
mockQueryGroupStateMap.put("queryGroupId1", new QueryGroupState());
217+
QueryGroupState state = new QueryGroupState();
218+
QueryGroupState.ResourceTypeState cpuResourceState = new QueryGroupState.ResourceTypeState(ResourceType.CPU);
219+
cpuResourceState.setLastRecordedUsage(0.10);
220+
state.getResourceState().put(ResourceType.CPU, cpuResourceState);
221+
QueryGroupState spyState = spy(state);
222+
mockQueryGroupStateMap.put(QUERY_GROUP_ID, spyState);
223+
224+
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);
225+
226+
queryGroupService = new QueryGroupService(
227+
mockCancellationService,
228+
mockClusterService,
229+
mockThreadPool,
230+
mockWorkloadManagementSettings,
231+
mockNodeDuressTrackers,
232+
mockQueryGroupsStateAccessor,
233+
activeQueryGroups,
234+
new HashSet<>()
235+
);
236+
when(mockWorkloadManagementSettings.getWlmMode()).thenReturn(WlmMode.ENABLED);
237+
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(true);
238+
assertThrows(OpenSearchRejectedExecutionException.class, () -> queryGroupService.rejectIfNeeded("queryGroupId1"));
239+
}
240+
206241
public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
207-
QueryGroup testQueryGroup = new QueryGroup(
242+
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
208243
"testQueryGroup",
209-
"queryGroupId1",
210-
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.10)),
211-
1L
244+
QUERY_GROUP_ID,
245+
MutableQueryGroupFragment.ResiliencyMode.SOFT,
246+
Map.of(ResourceType.CPU, 0.10)
212247
);
213-
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
214-
{
215-
add(testQueryGroup);
216-
}
217-
};
218248
mockQueryGroupStateMap = new HashMap<>();
219249
QueryGroupState spyState = spy(new QueryGroupState());
220250
mockQueryGroupStateMap.put("queryGroupId1", spyState);
221251

222252
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);
223253

224-
Map<String, QueryGroupState> spyMap = spy(mockQueryGroupStateMap);
225-
226254
queryGroupService = new QueryGroupService(
227255
mockCancellationService,
228256
mockClusterService,
@@ -239,11 +267,11 @@ public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
239267
}
240268

241269
public void testRejectIfNeeded_whenQueryGroupIsEnforcedMode_andNotBreaching() {
242-
QueryGroup testQueryGroup = new QueryGroup(
270+
QueryGroup testQueryGroup = getQueryGroup(
243271
"testQueryGroup",
244272
"queryGroupId1",
245-
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.10)),
246-
1L
273+
MutableQueryGroupFragment.ResiliencyMode.ENFORCED,
274+
Map.of(ResourceType.CPU, 0.10)
247275
);
248276
QueryGroup spuQueryGroup = spy(testQueryGroup);
249277
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
@@ -464,6 +492,31 @@ public void testShouldSBPHandle() {
464492

465493
}
466494

495+
private static Set<QueryGroup> getActiveQueryGroups(
496+
String name,
497+
String id,
498+
MutableQueryGroupFragment.ResiliencyMode mode,
499+
Map<ResourceType, Double> resourceLimits
500+
) {
501+
QueryGroup testQueryGroup = getQueryGroup(name, id, mode, resourceLimits);
502+
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
503+
{
504+
add(testQueryGroup);
505+
}
506+
};
507+
return activeQueryGroups;
508+
}
509+
510+
private static QueryGroup getQueryGroup(
511+
String name,
512+
String id,
513+
MutableQueryGroupFragment.ResiliencyMode mode,
514+
Map<ResourceType, Double> resourceLimits
515+
) {
516+
QueryGroup testQueryGroup = new QueryGroup(name, id, new MutableQueryGroupFragment(mode, resourceLimits), 1L);
517+
return testQueryGroup;
518+
}
519+
467520
// This is needed to test the behavior of QueryGroupService#doRun method
468521
static class TestQueryGroupCancellationService extends QueryGroupTaskCancellationService {
469522
public TestQueryGroupCancellationService(

0 commit comments

Comments
 (0)