Skip to content

Commit a65de8b

Browse files
committed
Ensure that cancelled request returns
I tried cancelling a request and found that the client would hang. This change reports an exception for each remaining request instead. Signed-off-by: Michael Froh <[email protected]>
1 parent 30d127a commit a65de8b

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

server/src/main/java/org/opensearch/action/search/TransportMultiSearchAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
4545
import org.opensearch.core.action.ActionListener;
4646
import org.opensearch.core.common.io.stream.Writeable;
47+
import org.opensearch.core.tasks.TaskCancelledException;
4748
import org.opensearch.core.tasks.TaskId;
4849
import org.opensearch.tasks.CancellableTask;
4950
import org.opensearch.tasks.Task;
@@ -195,7 +196,20 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
195196
if (responseCounter.decrementAndGet() == 0) {
196197
assert requests.isEmpty();
197198
finish();
198-
} else if (isCancelled(request.request.getParentTask()) == false) {
199+
} else if (isCancelled(request.request.getParentTask())) {
200+
// Drain the rest of the queue
201+
SearchRequestSlot request;
202+
while ((request = requests.poll()) != null) {
203+
responses.set(
204+
request.responseSlot,
205+
new MultiSearchResponse.Item(null, new TaskCancelledException("Parent task was cancelled"))
206+
);
207+
if (responseCounter.decrementAndGet() == 0) {
208+
assert requests.isEmpty();
209+
finish();
210+
}
211+
}
212+
} else {
199213
if (thread == Thread.currentThread()) {
200214
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
201215
threadPool.generic()

0 commit comments

Comments
 (0)