Skip to content

Commit 8e13c88

Browse files
msfrohprudhvigodithi
authored andcommitted
Stop processing search requests when _msearch is canceled (opensearch-project#17005)
Prior to this fix, the _msearch API would keep running search requests even after being canceled. With this change, we explicitly check if the task has been canceled before kicking off subsequent requests. --------- Signed-off-by: Michael Froh <[email protected]>
1 parent 0d7ac2c commit 8e13c88

File tree

17 files changed

+157
-11
lines changed

17 files changed

+157
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
111111
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))
112112
- Fix multi-value sort for unsigned long ([#16732](https://github.com/opensearch-project/OpenSearch/pull/16732))
113113
- The `phone-search` analyzer no longer emits the tel/sip prefix, international calling code, extension numbers and unformatted input as a token ([#16993](https://github.com/opensearch-project/OpenSearch/pull/16993))
114+
- Stop processing search requests when _msearch request is cancelled ([#17005](https://github.com/opensearch-project/OpenSearch/pull/17005))
114115
- Fix GRPC AUX_TRANSPORT_PORT and SETTING_GRPC_PORT settings and remove lingering HTTP terminology ([#17037](https://github.com/opensearch-project/OpenSearch/pull/17037))
115116
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
116117

buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class ThirdPartyAuditTask extends DefaultTask {
9494
CliMain.EXIT_VIOLATION,
9595
CliMain.EXIT_UNSUPPORTED_JDK
9696
);
97-
private static final String JDK_JAR_HELL_MAIN_CLASS = "org.opensearch.bootstrap.JdkJarHellCheck";
97+
private static final String JDK_JAR_HELL_MAIN_CLASS = "org.opensearch.common.bootstrap.JdkJarHellCheck";
9898

9999
private Set<String> missingClassExcludes = new TreeSet<>();
100100

buildSrc/src/testFixtures/java/org/opensearch/gradle/test/TestClasspathUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class TestClasspathUtils {
4848
public static void setupJarJdkClasspath(File projectRoot) {
4949
try {
5050
URL originLocation = TestClasspathUtils.class.getClassLoader()
51-
.loadClass("org.opensearch.bootstrap.JdkJarHellCheck")
51+
.loadClass("org.opensearch.common.bootstrap.JdkJarHellCheck")
5252
.getProtectionDomain()
5353
.getCodeSource()
5454
.getLocation();

distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/InstallPluginCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
import org.bouncycastle.openpgp.operator.jcajce.JcaPGPContentVerifierBuilderProvider;
5353
import org.opensearch.Build;
5454
import org.opensearch.Version;
55-
import org.opensearch.bootstrap.JarHell;
55+
import org.opensearch.common.bootstrap.JarHell;
5656
import org.opensearch.cli.EnvironmentAwareCommand;
5757
import org.opensearch.cli.ExitCodes;
5858
import org.opensearch.cli.Terminal;

libs/common/src/main/java/org/opensearch/bootstrap/JarHell.java renamed to libs/common/src/main/java/org/opensearch/common/bootstrap/JarHell.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.bootstrap;
33+
package org.opensearch.common.bootstrap;
3434

3535
import org.opensearch.common.SuppressForbidden;
3636
import org.opensearch.common.io.PathUtils;

libs/common/src/main/java/org/opensearch/bootstrap/JdkJarHellCheck.java renamed to libs/common/src/main/java/org/opensearch/common/bootstrap/JdkJarHellCheck.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* GitHub history for details.
3030
*/
3131

32-
package org.opensearch.bootstrap;
32+
package org.opensearch.common.bootstrap;
3333

3434
import org.opensearch.common.SuppressForbidden;
3535

libs/common/src/main/java/org/opensearch/bootstrap/package-info.java renamed to libs/common/src/main/java/org/opensearch/common/bootstrap/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
*/
88

99
/** Contains JarHell Classes */
10-
package org.opensearch.bootstrap;
10+
package org.opensearch.common.bootstrap;

libs/common/src/test/java/org/opensearch/bootstrap/JarHellTests.java renamed to libs/common/src/test/java/org/opensearch/common/bootstrap/JarHellTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.bootstrap;
33+
package org.opensearch.common.bootstrap;
3434

3535
import org.opensearch.common.io.PathUtils;
3636
import org.opensearch.core.common.Strings;

plugins/ingest-attachment/src/main/java/org/opensearch/ingest/attachment/TikaImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.apache.tika.parser.ParserDecorator;
4242
import org.opensearch.SpecialPermission;
4343
import org.opensearch.bootstrap.FilePermissionUtils;
44-
import org.opensearch.bootstrap.JarHell;
44+
import org.opensearch.common.bootstrap.JarHell;
4545
import org.opensearch.common.SuppressForbidden;
4646
import org.opensearch.common.io.PathUtils;
4747

server/src/main/java/org/opensearch/action/search/TransportMultiSearchAction.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
4545
import org.opensearch.core.action.ActionListener;
4646
import org.opensearch.core.common.io.stream.Writeable;
47+
import org.opensearch.core.tasks.TaskCancelledException;
48+
import org.opensearch.core.tasks.TaskId;
49+
import org.opensearch.tasks.CancellableTask;
4750
import org.opensearch.tasks.Task;
4851
import org.opensearch.threadpool.ThreadPool;
4952
import org.opensearch.transport.TransportService;
@@ -193,6 +196,19 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
193196
if (responseCounter.decrementAndGet() == 0) {
194197
assert requests.isEmpty();
195198
finish();
199+
} else if (isCancelled(request.request.getParentTask())) {
200+
// Drain the rest of the queue
201+
SearchRequestSlot request;
202+
while ((request = requests.poll()) != null) {
203+
responses.set(
204+
request.responseSlot,
205+
new MultiSearchResponse.Item(null, new TaskCancelledException("Parent task was cancelled"))
206+
);
207+
if (responseCounter.decrementAndGet() == 0) {
208+
assert requests.isEmpty();
209+
finish();
210+
}
211+
}
196212
} else {
197213
if (thread == Thread.currentThread()) {
198214
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
@@ -220,6 +236,14 @@ private long buildTookInMillis() {
220236
});
221237
}
222238

239+
private boolean isCancelled(TaskId taskId) {
240+
if (taskId.isSet()) {
241+
CancellableTask task = taskManager.getCancellableTask(taskId.getId());
242+
return task != null && task.isCancelled();
243+
}
244+
return false;
245+
}
246+
223247
/**
224248
* Slots a search request
225249
*

server/src/main/java/org/opensearch/bootstrap/Bootstrap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.cli.UserException;
4848
import org.opensearch.common.PidFile;
4949
import org.opensearch.common.SuppressForbidden;
50+
import org.opensearch.common.bootstrap.JarHell;
5051
import org.opensearch.common.inject.CreationException;
5152
import org.opensearch.common.logging.LogConfigurator;
5253
import org.opensearch.common.logging.Loggers;

server/src/main/java/org/opensearch/bootstrap/Security.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.cli.Command;
3636
import org.opensearch.common.SuppressForbidden;
37+
import org.opensearch.common.bootstrap.JarHell;
3738
import org.opensearch.common.io.PathUtils;
3839
import org.opensearch.common.settings.Setting;
3940
import org.opensearch.common.settings.Settings;

server/src/main/java/org/opensearch/plugins/PluginInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import com.fasterxml.jackson.core.json.JsonReadFeature;
3737

3838
import org.opensearch.Version;
39-
import org.opensearch.bootstrap.JarHell;
39+
import org.opensearch.common.bootstrap.JarHell;
4040
import org.opensearch.common.annotation.PublicApi;
4141
import org.opensearch.common.xcontent.json.JsonXContentParser;
4242
import org.opensearch.core.common.Strings;

server/src/main/java/org/opensearch/plugins/PluginsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.opensearch.OpenSearchException;
4444
import org.opensearch.Version;
4545
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
46-
import org.opensearch.bootstrap.JarHell;
46+
import org.opensearch.common.bootstrap.JarHell;
4747
import org.opensearch.common.collect.Tuple;
4848
import org.opensearch.common.inject.Module;
4949
import org.opensearch.common.lifecycle.LifecycleComponent;

server/src/test/java/org/opensearch/action/search/TransportMultiSearchActionTests.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import org.opensearch.common.settings.Settings;
5050
import org.opensearch.core.action.ActionListener;
5151
import org.opensearch.search.internal.InternalSearchResponse;
52+
import org.opensearch.tasks.CancellableTask;
5253
import org.opensearch.tasks.Task;
54+
import org.opensearch.tasks.TaskListener;
5355
import org.opensearch.tasks.TaskManager;
5456
import org.opensearch.telemetry.tracing.noop.NoopTracer;
5557
import org.opensearch.test.OpenSearchTestCase;
@@ -62,7 +64,9 @@
6264
import java.util.IdentityHashMap;
6365
import java.util.List;
6466
import java.util.Set;
67+
import java.util.concurrent.CountDownLatch;
6568
import java.util.concurrent.ExecutorService;
69+
import java.util.concurrent.TimeUnit;
6670
import java.util.concurrent.atomic.AtomicInteger;
6771
import java.util.concurrent.atomic.AtomicReference;
6872

@@ -289,4 +293,118 @@ public void testDefaultMaxConcurrentSearches() {
289293
assertThat(result, equalTo(1));
290294
}
291295

296+
public void testCancellation() {
297+
// Initialize dependencies of TransportMultiSearchAction
298+
Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build();
299+
ActionFilters actionFilters = mock(ActionFilters.class);
300+
when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
301+
ThreadPool threadPool = new ThreadPool(settings);
302+
TransportService transportService = new TransportService(
303+
Settings.EMPTY,
304+
mock(Transport.class),
305+
threadPool,
306+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
307+
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()),
308+
null,
309+
Collections.emptySet(),
310+
NoopTracer.INSTANCE
311+
) {
312+
@Override
313+
public TaskManager getTaskManager() {
314+
return taskManager;
315+
}
316+
};
317+
ClusterService clusterService = mock(ClusterService.class);
318+
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());
319+
320+
// Keep track of the number of concurrent searches started by multi search api,
321+
// and if there are more searches than is allowed create an error and remember that.
322+
int maxAllowedConcurrentSearches = 1; // Allow 1 search at a time.
323+
AtomicInteger counter = new AtomicInteger();
324+
AtomicReference<AssertionError> errorHolder = new AtomicReference<>();
325+
// randomize whether or not requests are executed asynchronously
326+
ExecutorService executorService = threadPool.executor(ThreadPool.Names.GENERIC);
327+
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
328+
CountDownLatch countDownLatch = new CountDownLatch(1);
329+
CancellableTask[] parentTask = new CancellableTask[1];
330+
NodeClient client = new NodeClient(settings, threadPool) {
331+
@Override
332+
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
333+
if (parentTask[0] != null && parentTask[0].isCancelled()) {
334+
fail("Should not execute search after parent task is cancelled");
335+
}
336+
try {
337+
countDownLatch.await(10, TimeUnit.MILLISECONDS);
338+
} catch (InterruptedException e) {
339+
throw new RuntimeException(e);
340+
}
341+
342+
requests.add(request);
343+
executorService.execute(() -> {
344+
counter.decrementAndGet();
345+
listener.onResponse(
346+
new SearchResponse(
347+
InternalSearchResponse.empty(),
348+
null,
349+
0,
350+
0,
351+
0,
352+
0L,
353+
ShardSearchFailure.EMPTY_ARRAY,
354+
SearchResponse.Clusters.EMPTY
355+
)
356+
);
357+
});
358+
}
359+
360+
@Override
361+
public String getLocalNodeId() {
362+
return "local_node_id";
363+
}
364+
};
365+
366+
TransportMultiSearchAction action = new TransportMultiSearchAction(
367+
threadPool,
368+
actionFilters,
369+
transportService,
370+
clusterService,
371+
10,
372+
System::nanoTime,
373+
client
374+
);
375+
376+
// Execute the multi search api and fail if we find an error after executing:
377+
try {
378+
/*
379+
* Allow for a large number of search requests in a single batch as previous implementations could stack overflow if the number
380+
* of requests in a single batch was large
381+
*/
382+
int numSearchRequests = scaledRandomIntBetween(1024, 8192);
383+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
384+
multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches);
385+
for (int i = 0; i < numSearchRequests; i++) {
386+
multiSearchRequest.add(new SearchRequest());
387+
}
388+
MultiSearchResponse[] responses = new MultiSearchResponse[1];
389+
Exception[] exceptions = new Exception[1];
390+
parentTask[0] = (CancellableTask) action.execute(multiSearchRequest, new TaskListener<>() {
391+
@Override
392+
public void onResponse(Task task, MultiSearchResponse items) {
393+
responses[0] = items;
394+
}
395+
396+
@Override
397+
public void onFailure(Task task, Exception e) {
398+
exceptions[0] = e;
399+
}
400+
});
401+
parentTask[0].cancel("Giving up");
402+
countDownLatch.countDown();
403+
404+
assertNull(responses[0]);
405+
assertNull(exceptions[0]);
406+
} finally {
407+
assertTrue(OpenSearchTestCase.terminate(threadPool));
408+
}
409+
}
292410
}

server/src/test/java/org/opensearch/plugins/PluginsServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.lucene.util.Constants;
3939
import org.opensearch.LegacyESVersion;
4040
import org.opensearch.Version;
41-
import org.opensearch.bootstrap.JarHell;
41+
import org.opensearch.common.bootstrap.JarHell;
4242
import org.opensearch.common.collect.Tuple;
4343
import org.opensearch.common.io.PathUtils;
4444
import org.opensearch.common.settings.Settings;

test/framework/src/main/java/org/opensearch/bootstrap/BootstrapForTesting.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.lucene.tests.util.LuceneTestCase;
4040
import org.opensearch.common.Booleans;
4141
import org.opensearch.common.SuppressForbidden;
42+
import org.opensearch.common.bootstrap.JarHell;
4243
import org.opensearch.common.io.PathUtils;
4344
import org.opensearch.common.network.IfConfig;
4445
import org.opensearch.common.network.NetworkAddress;

0 commit comments

Comments
 (0)