Skip to content

Commit 78688fd

Browse files
committed
Add query group level rejection logic (#15428)
* add rejection listener Signed-off-by: Kaushal Kumar <[email protected]> * add rejection listener unit test Signed-off-by: Kaushal Kumar <[email protected]> * add rejection logic for shard level requests Signed-off-by: Kaushal Kumar <[email protected]> * add changelog entry Signed-off-by: Kaushal Kumar <[email protected]> * apply spotless check Signed-off-by: Kaushal Kumar <[email protected]> * remove unused files and fix precommit Signed-off-by: Kaushal Kumar <[email protected]> * refactor code Signed-off-by: Kaushal Kumar <[email protected]> * add package info file Signed-off-by: Kaushal Kumar <[email protected]> * remove unused method from QueryGroupService stub Signed-off-by: Kaushal Kumar <[email protected]> --------- Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 3c9e08b commit 78688fd

9 files changed

+180
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
3232
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
3333
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
34+
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
3435

3536
### Dependencies
3637
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@
263263
import org.opensearch.transport.TransportService;
264264
import org.opensearch.usage.UsageService;
265265
import org.opensearch.watcher.ResourceWatcherService;
266+
import org.opensearch.wlm.QueryGroupService;
266267
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
268+
import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener;
267269

268270
import javax.net.ssl.SNIHostName;
269271

@@ -996,11 +998,22 @@ protected Node(
996998
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
997999
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);
9981000

1001+
final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener =
1002+
new QueryGroupRequestRejectionOperationListener(
1003+
new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService
1004+
threadPool
1005+
);
1006+
9991007
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
10001008
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
10011009
new SearchRequestOperationsCompositeListenerFactory(
10021010
Stream.concat(
1003-
Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener),
1011+
Stream.of(
1012+
searchRequestStats,
1013+
searchRequestSlowLog,
1014+
searchTaskRequestOperationsListener,
1015+
queryGroupRequestRejectionListener
1016+
),
10041017
pluginComponents.stream()
10051018
.filter(p -> p instanceof SearchRequestOperationsListener)
10061019
.map(p -> (SearchRequestOperationsListener) p)
@@ -1050,7 +1063,8 @@ protected Node(
10501063
);
10511064

10521065
WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
1053-
threadPool
1066+
threadPool,
1067+
new QueryGroupService() // We will need to replace this with actual implementation
10541068
);
10551069

10561070
final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm;
10+
11+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
12+
13+
/**
14+
* This is stub at this point in time and will be replace by an acutal one in couple of days
15+
*/
16+
public class QueryGroupService {
17+
/**
18+
*
19+
* @param queryGroupId query group identifier
20+
*/
21+
public void rejectIfNeeded(String queryGroupId) {
22+
if (queryGroupId == null) return;
23+
boolean reject = false;
24+
final StringBuilder reason = new StringBuilder();
25+
// TODO: At this point this is dummy and we need to decide whether to cancel the request based on last
26+
// reported resource usage for the queryGroup. We also need to increment the rejection count here for the
27+
// query group
28+
if (reject) {
29+
throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.toString());
30+
}
31+
}
32+
}

server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
*/
2121
public class WorkloadManagementTransportInterceptor implements TransportInterceptor {
2222
private final ThreadPool threadPool;
23+
private final QueryGroupService queryGroupService;
2324

24-
public WorkloadManagementTransportInterceptor(ThreadPool threadPool) {
25+
public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final QueryGroupService queryGroupService) {
2526
this.threadPool = threadPool;
27+
this.queryGroupService = queryGroupService;
2628
}
2729

2830
@Override
@@ -32,7 +34,7 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
3234
boolean forceExecution,
3335
TransportRequestHandler<T> actualHandler
3436
) {
35-
return new RequestHandler<T>(threadPool, actualHandler);
37+
return new RequestHandler<T>(threadPool, actualHandler, queryGroupService);
3638
}
3739

3840
/**
@@ -43,16 +45,20 @@ public static class RequestHandler<T extends TransportRequest> implements Transp
4345

4446
private final ThreadPool threadPool;
4547
TransportRequestHandler<T> actualHandler;
48+
private final QueryGroupService queryGroupService;
4649

47-
public RequestHandler(ThreadPool threadPool, TransportRequestHandler<T> actualHandler) {
50+
public RequestHandler(ThreadPool threadPool, TransportRequestHandler<T> actualHandler, QueryGroupService queryGroupService) {
4851
this.threadPool = threadPool;
4952
this.actualHandler = actualHandler;
53+
this.queryGroupService = queryGroupService;
5054
}
5155

5256
@Override
5357
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
5458
if (isSearchWorkloadRequest(task)) {
5559
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
60+
final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId();
61+
queryGroupService.rejectIfNeeded(queryGroupId);
5662
}
5763
actualHandler.messageReceived(request, channel, task);
5864
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm.listeners;
10+
11+
import org.opensearch.action.search.SearchRequestContext;
12+
import org.opensearch.action.search.SearchRequestOperationsListener;
13+
import org.opensearch.threadpool.ThreadPool;
14+
import org.opensearch.wlm.QueryGroupService;
15+
import org.opensearch.wlm.QueryGroupTask;
16+
17+
/**
18+
* This listener is used to perform the rejections for incoming requests into a queryGroup
19+
*/
20+
public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener {
21+
22+
private final QueryGroupService queryGroupService;
23+
private final ThreadPool threadPool;
24+
25+
public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
26+
this.queryGroupService = queryGroupService;
27+
this.threadPool = threadPool;
28+
}
29+
30+
/**
31+
* This method assumes that the queryGroupId is already populated in the thread context
32+
* @param searchRequestContext SearchRequestContext instance
33+
*/
34+
@Override
35+
protected void onRequestStart(SearchRequestContext searchRequestContext) {
36+
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
37+
queryGroupService.rejectIfNeeded(queryGroupId);
38+
}
39+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* WLM related listener constructs
11+
*/
12+
package org.opensearch.wlm.listeners;

server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestC
2525
public void setUp() throws Exception {
2626
super.setUp();
2727
threadPool = new TestThreadPool(getTestName());
28-
sut = new WorkloadManagementTransportInterceptor(threadPool);
28+
sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService());
2929
}
3030

3131
public void tearDown() throws Exception {

server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.wlm;
1010

1111
import org.opensearch.action.index.IndexRequest;
12+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
1213
import org.opensearch.search.internal.ShardSearchRequest;
1314
import org.opensearch.tasks.Task;
1415
import org.opensearch.test.OpenSearchTestCase;
@@ -20,36 +21,49 @@
2021

2122
import java.util.Collections;
2223

24+
import static org.mockito.Mockito.anyString;
25+
import static org.mockito.Mockito.doNothing;
26+
import static org.mockito.Mockito.doThrow;
2327
import static org.mockito.Mockito.mock;
2428
import static org.mockito.Mockito.spy;
2529

2630
public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase {
2731
private WorkloadManagementTransportInterceptor.RequestHandler<TransportRequest> sut;
2832
private ThreadPool threadPool;
33+
private QueryGroupService queryGroupService;
2934

3035
private TestTransportRequestHandler<TransportRequest> actualHandler;
3136

3237
public void setUp() throws Exception {
3338
super.setUp();
3439
threadPool = new TestThreadPool(getTestName());
3540
actualHandler = new TestTransportRequestHandler<>();
41+
queryGroupService = mock(QueryGroupService.class);
3642

37-
sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler);
43+
sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler, queryGroupService);
3844
}
3945

4046
public void tearDown() throws Exception {
4147
super.tearDown();
4248
threadPool.shutdown();
4349
}
4450

45-
public void testMessageReceivedForSearchWorkload() throws Exception {
51+
public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Exception {
4652
ShardSearchRequest request = mock(ShardSearchRequest.class);
4753
QueryGroupTask spyTask = getSpyTask();
48-
54+
doNothing().when(queryGroupService).rejectIfNeeded(anyString());
4955
sut.messageReceived(request, mock(TransportChannel.class), spyTask);
5056
assertTrue(sut.isSearchWorkloadRequest(spyTask));
5157
}
5258

59+
public void testMessageReceivedForSearchWorkload_RejectionCase() throws Exception {
60+
ShardSearchRequest request = mock(ShardSearchRequest.class);
61+
QueryGroupTask spyTask = getSpyTask();
62+
doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(anyString());
63+
64+
assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.messageReceived(request, mock(TransportChannel.class), spyTask));
65+
}
66+
5367
public void testMessageReceivedForNonSearchWorkload() throws Exception {
5468
IndexRequest indexRequest = mock(IndexRequest.class);
5569
Task task = mock(Task.class);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm.listeners;
10+
11+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
import org.opensearch.threadpool.TestThreadPool;
14+
import org.opensearch.threadpool.ThreadPool;
15+
import org.opensearch.wlm.QueryGroupService;
16+
import org.opensearch.wlm.QueryGroupTask;
17+
18+
import static org.mockito.Mockito.doNothing;
19+
import static org.mockito.Mockito.doThrow;
20+
import static org.mockito.Mockito.mock;
21+
22+
public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase {
23+
ThreadPool testThreadPool;
24+
QueryGroupService queryGroupService;
25+
QueryGroupRequestRejectionOperationListener sut;
26+
27+
public void setUp() throws Exception {
28+
super.setUp();
29+
testThreadPool = new TestThreadPool("RejectionTestThreadPool");
30+
queryGroupService = mock(QueryGroupService.class);
31+
sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool);
32+
}
33+
34+
public void tearDown() throws Exception {
35+
super.tearDown();
36+
testThreadPool.shutdown();
37+
}
38+
39+
public void testRejectionCase() {
40+
final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t";
41+
testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId);
42+
doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(testQueryGroupId);
43+
assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null));
44+
}
45+
46+
public void testNonRejectionCase() {
47+
final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t";
48+
testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId);
49+
doNothing().when(queryGroupService).rejectIfNeeded(testQueryGroupId);
50+
51+
sut.onRequestStart(null);
52+
}
53+
}

0 commit comments

Comments
 (0)