Skip to content

Commit dcb30c3

Browse files
committed
Add X-Opaque-Id to search request metadata for query insights (#13374)
--------- Signed-off-by: Chenyang Ji <[email protected]> (cherry picked from commit 9d3cf43) Signed-off-by: Chenyang Ji <[email protected]>
1 parent d2da9bb commit dcb30c3

File tree

6 files changed

+53
-5
lines changed

6 files changed

+53
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
1515
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
1616
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
17+
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
1718

1819
### Dependencies
1920
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.plugin.insights.rules.model.Attribute;
2222
import org.opensearch.plugin.insights.rules.model.MetricType;
2323
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
24+
import org.opensearch.tasks.Task;
2425

2526
import java.util.Collections;
2627
import java.util.HashMap;
@@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
138139
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
139140
attributes.put(Attribute.INDICES, request.indices());
140141
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
142+
143+
Map<String, Object> labels = new HashMap<>();
144+
// Retrieve user provided label if exists
145+
String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
146+
if (userProvidedLabel != null) {
147+
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
148+
}
149+
attributes.put(Attribute.LABELS, labels);
150+
// construct SearchQueryRecord from attributes and measurements
141151
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
142152
queryInsightsService.addRecord(record);
143153
} catch (Exception e) {

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ public enum Attribute {
4343
/**
4444
* The node id for this request
4545
*/
46-
NODE_ID;
46+
NODE_ID,
47+
/**
48+
* Custom search request labels
49+
*/
50+
LABELS;
4751

4852
/**
4953
* Read an Attribute from a StreamInput

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,38 @@
1111
import org.opensearch.action.search.SearchPhaseContext;
1212
import org.opensearch.action.search.SearchRequest;
1313
import org.opensearch.action.search.SearchRequestContext;
14+
import org.opensearch.action.search.SearchTask;
1415
import org.opensearch.action.search.SearchType;
1516
import org.opensearch.cluster.service.ClusterService;
17+
import org.opensearch.common.collect.Tuple;
1618
import org.opensearch.common.settings.ClusterSettings;
1719
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.common.util.concurrent.ThreadContext;
1821
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
1922
import org.opensearch.plugin.insights.core.service.TopQueriesService;
23+
import org.opensearch.plugin.insights.rules.model.Attribute;
2024
import org.opensearch.plugin.insights.rules.model.MetricType;
25+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
2126
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
2227
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
2328
import org.opensearch.search.aggregations.support.ValueType;
2429
import org.opensearch.search.builder.SearchSourceBuilder;
30+
import org.opensearch.tasks.Task;
2531
import org.opensearch.test.OpenSearchTestCase;
32+
import org.opensearch.threadpool.ThreadPool;
2633
import org.junit.Before;
2734

2835
import java.util.ArrayList;
36+
import java.util.Collections;
2937
import java.util.HashMap;
3038
import java.util.List;
39+
import java.util.Locale;
3140
import java.util.Map;
3241
import java.util.concurrent.CountDownLatch;
3342
import java.util.concurrent.Phaser;
3443

44+
import org.mockito.ArgumentCaptor;
45+
3546
import static org.mockito.ArgumentMatchers.any;
3647
import static org.mockito.Mockito.mock;
3748
import static org.mockito.Mockito.times;
@@ -47,6 +58,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
4758
private final SearchRequest searchRequest = mock(SearchRequest.class);
4859
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
4960
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
61+
private final ThreadPool threadPool = mock(ThreadPool.class);
5062
private ClusterService clusterService;
5163

5264
@Before
@@ -60,15 +72,21 @@ public void setup() {
6072
clusterService = new ClusterService(settings, clusterSettings, null);
6173
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
6274
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
75+
76+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
77+
threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
78+
when(threadPool.getThreadContext()).thenReturn(threadContext);
6379
}
6480

81+
@SuppressWarnings("unchecked")
6582
public void testOnRequestEnd() throws InterruptedException {
6683
Long timestamp = System.currentTimeMillis() - 100L;
6784
SearchType searchType = SearchType.QUERY_THEN_FETCH;
6885

6986
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
7087
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
7188
searchSourceBuilder.size(0);
89+
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));
7290

7391
String[] indices = new String[] { "index-1", "index-2" };
7492

@@ -88,10 +106,19 @@ public void testOnRequestEnd() throws InterruptedException {
88106
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
89107
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
90108
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
109+
when(searchPhaseContext.getTask()).thenReturn(task);
110+
ArgumentCaptor<SearchQueryRecord> captor = ArgumentCaptor.forClass(SearchQueryRecord.class);
91111

92112
queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);
93113

94-
verify(queryInsightsService, times(1)).addRecord(any());
114+
verify(queryInsightsService, times(1)).addRecord(captor.capture());
115+
SearchQueryRecord generatedRecord = captor.getValue();
116+
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
117+
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
118+
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
119+
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
120+
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
121+
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
95122
}
96123

97124
public void testConcurrentOnRequestEnd() throws InterruptedException {
@@ -101,6 +128,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
101128
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
102129
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
103130
searchSourceBuilder.size(0);
131+
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));
104132

105133
String[] indices = new String[] { "index-1", "index-2" };
106134

@@ -120,6 +148,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
120148
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
121149
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
122150
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
151+
when(searchPhaseContext.getTask()).thenReturn(task);
123152

124153
int numRequests = 50;
125154
Thread[] threads = new Thread[numRequests];

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ String formattedShardStats() {
107107
);
108108
}
109109
}
110+
111+
public SearchRequest getRequest() {
112+
return searchRequest;
113+
}
110114
}
111115

112116
enum ShardStatsFieldNames {

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ protected SearchRequestOperationsListener(final boolean enabled) {
4141
this.enabled = enabled;
4242
}
4343

44-
protected abstract void onPhaseStart(SearchPhaseContext context);
44+
protected void onPhaseStart(SearchPhaseContext context) {};
4545

46-
protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
46+
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {};
4747

48-
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
48+
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {};
4949

5050
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
5151

0 commit comments

Comments
 (0)