Skip to content

Add tracking for long running SearchTask post cancellation #17726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
- Add tracking for long-running SearchTask post cancellation ([#17726](https://github.com/opensearch-project/OpenSearch/pull/17726))
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
- Add FilterFieldType for developers who want to wrap MappedFieldType ([#17627](https://github.com/opensearch-project/OpenSearch/pull/17627))
- [Rule Based Auto-tagging] Add in-memory rule processing service ([#17365](https://github.com/opensearch-project/OpenSearch/pull/17365))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Base class for search task cancellation statistics.
*/
public abstract class BaseSearchTaskCancellationStats implements ToXContentObject, Writeable {

private final long currentLongRunningCancelledTaskCount;
private final long totalLongRunningCancelledTaskCount;

public BaseSearchTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
this.currentLongRunningCancelledTaskCount = currentTaskCount;
this.totalLongRunningCancelledTaskCount = totalTaskCount;
}

public BaseSearchTaskCancellationStats(StreamInput in) throws IOException {
this.currentLongRunningCancelledTaskCount = in.readVLong();
this.totalLongRunningCancelledTaskCount = in.readVLong();
}

protected long getCurrentLongRunningCancelledTaskCount() {
return this.currentLongRunningCancelledTaskCount;
}

protected long getTotalLongRunningCancelledTaskCount() {
return this.totalLongRunningCancelledTaskCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
return builder.endObject();

Check warning on line 51 in server/src/main/java/org/opensearch/tasks/BaseSearchTaskCancellationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/tasks/BaseSearchTaskCancellationStats.java#L48-L51

Added lines #L48 - L51 were not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentLongRunningCancelledTaskCount);
out.writeVLong(totalLongRunningCancelledTaskCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseSearchTaskCancellationStats that = (BaseSearchTaskCancellationStats) o;
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
}

@Override
public int hashCode() {
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,67 +9,19 @@
package org.opensearch.tasks;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Holds monitoring service stats specific to search shard task.
*/
public class SearchShardTaskCancellationStats implements ToXContentObject, Writeable {

private final long currentLongRunningCancelledTaskCount;
private final long totalLongRunningCancelledTaskCount;
public class SearchShardTaskCancellationStats extends BaseSearchTaskCancellationStats {

public SearchShardTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
this.currentLongRunningCancelledTaskCount = currentTaskCount;
this.totalLongRunningCancelledTaskCount = totalTaskCount;
super(currentTaskCount, totalTaskCount);
}

public SearchShardTaskCancellationStats(StreamInput in) throws IOException {
this.currentLongRunningCancelledTaskCount = in.readVLong();
this.totalLongRunningCancelledTaskCount = in.readVLong();
}

// package private for testing
protected long getCurrentLongRunningCancelledTaskCount() {
return this.currentLongRunningCancelledTaskCount;
}

// package private for testing
protected long getTotalLongRunningCancelledTaskCount() {
return this.totalLongRunningCancelledTaskCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentLongRunningCancelledTaskCount);
out.writeVLong(totalLongRunningCancelledTaskCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardTaskCancellationStats that = (SearchShardTaskCancellationStats) o;
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
}

@Override
public int hashCode() {
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Holds monitoring service stats specific to search task.
*/
public class SearchTaskCancellationStats extends BaseSearchTaskCancellationStats {

public SearchTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
super(currentTaskCount, totalTaskCount);
}

public SearchTaskCancellationStats(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchTask;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.threadpool.Scheduler;
Expand All @@ -32,7 +33,7 @@
public class TaskCancellationMonitoringService extends AbstractLifecycleComponent implements TaskManager.TaskEventListeners {

private static final Logger logger = LogManager.getLogger(TaskCancellationMonitoringService.class);
private final static List<Class<? extends CancellableTask>> TASKS_TO_TRACK = Arrays.asList(SearchShardTask.class);
private final static List<Class<? extends CancellableTask>> TASKS_TO_TRACK = Arrays.asList(SearchShardTask.class, SearchTask.class);

private volatile Scheduler.Cancellable scheduledFuture;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -146,6 +147,10 @@ public TaskCancellationStats stats() {
Map<Class<? extends CancellableTask>, List<CancellableTask>> currentRunningCancelledTasks =
getCurrentRunningTasksPostCancellation();
return new TaskCancellationStats(
new SearchTaskCancellationStats(
Optional.of(currentRunningCancelledTasks).map(mapper -> mapper.get(SearchTask.class)).map(List::size).orElse(0),
cancellationStatsHolder.get(SearchTask.class).totalLongRunningCancelledTaskCount.count()
),
new SearchShardTaskCancellationStats(
Optional.of(currentRunningCancelledTasks).map(mapper -> mapper.get(SearchShardTask.class)).map(List::size).orElse(0),
cancellationStatsHolder.get(SearchShardTask.class).totalLongRunningCancelledTaskCount.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.tasks;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -22,13 +23,23 @@
*/
public class TaskCancellationStats implements ToXContentFragment, Writeable {

private final SearchTaskCancellationStats searchTaskCancellationStats;
private final SearchShardTaskCancellationStats searchShardTaskCancellationStats;

public TaskCancellationStats(SearchShardTaskCancellationStats searchShardTaskCancellationStats) {
public TaskCancellationStats(
SearchTaskCancellationStats searchTaskCancellationStats,
SearchShardTaskCancellationStats searchShardTaskCancellationStats
) {
this.searchTaskCancellationStats = searchTaskCancellationStats;
this.searchShardTaskCancellationStats = searchShardTaskCancellationStats;
}

public TaskCancellationStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
searchTaskCancellationStats = new SearchTaskCancellationStats(in);
} else {
searchTaskCancellationStats = new SearchTaskCancellationStats(0, 0);

Check warning on line 41 in server/src/main/java/org/opensearch/tasks/TaskCancellationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/tasks/TaskCancellationStats.java#L41

Added line #L41 was not covered by tests
}
searchShardTaskCancellationStats = new SearchShardTaskCancellationStats(in);
}

Expand All @@ -37,15 +48,24 @@
return this.searchShardTaskCancellationStats;
}

// package private for testing
protected SearchTaskCancellationStats getSearchTaskCancellationStats() {
return this.searchTaskCancellationStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("task_cancellation");
builder.field("search_task", searchTaskCancellationStats);

Check warning on line 59 in server/src/main/java/org/opensearch/tasks/TaskCancellationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/tasks/TaskCancellationStats.java#L59

Added line #L59 was not covered by tests
builder.field("search_shard_task", searchShardTaskCancellationStats);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
searchTaskCancellationStats.writeTo(out);
}
searchShardTaskCancellationStats.writeTo(out);
}

Expand All @@ -54,11 +74,12 @@
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskCancellationStats that = (TaskCancellationStats) o;
return Objects.equals(searchShardTaskCancellationStats, that.searchShardTaskCancellationStats);
return Objects.equals(searchTaskCancellationStats, that.searchTaskCancellationStats)
&& Objects.equals(searchShardTaskCancellationStats, that.searchShardTaskCancellationStats);
}

@Override
public int hashCode() {
return Objects.hash(searchShardTaskCancellationStats);
return Objects.hash(searchTaskCancellationStats, searchShardTaskCancellationStats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.test.AbstractWireSerializingTestCase;

public class SearchTaskCancellationStatsTests extends AbstractWireSerializingTestCase<SearchTaskCancellationStats> {
@Override
protected Writeable.Reader<SearchTaskCancellationStats> instanceReader() {
return SearchTaskCancellationStats::new;
}

@Override
protected SearchTaskCancellationStats createTestInstance() {
return randomInstance();
}

public static SearchTaskCancellationStats randomInstance() {
return new SearchTaskCancellationStats(randomNonNegativeLong(), randomNonNegativeLong());
}
}
Loading
Loading