Skip to content

Commit b676849

Browse files
committed
Query insights exporters implementation
Signed-off-by: Chenyang Ji <[email protected]>
1 parent 5d939b9 commit b676849

18 files changed

+612
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- [Query Insights] Add Exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
89
- Constant Keyword Field ([#12285](https://github.com/opensearch-project/OpenSearch/pull/12285))
910
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
1011
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Collection<Object> createComponents(
7070
final Supplier<RepositoriesService> repositoriesServiceSupplier
7171
) {
7272
// create top n queries service
73-
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
73+
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
7474
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
7575
}
7676

@@ -110,7 +110,8 @@ public List<Setting<?>> getSettings() {
110110
// Settings for top N queries
111111
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
112112
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
113-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
113+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
114+
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
114115
);
115116
}
116117
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
18+
/**
19+
* Base Abstract class for Query Insights exporters
20+
*/
21+
public abstract class AbstractExporter {
22+
/**
23+
* Logger of exporter
24+
*/
25+
protected final Logger logger = LogManager.getLogger(this.getClass());
26+
27+
/**
28+
* Constructor of AbstractExporter
29+
*/
30+
protected AbstractExporter() {}
31+
32+
/**
33+
* Export a list of SearchQueryRecord to the exporter sink
34+
*
35+
* @param records list of {@link SearchQueryRecord}
36+
* @return True if export succeed, false otherwise
37+
*/
38+
public abstract boolean export(final List<SearchQueryRecord> records);
39+
40+
/**
41+
* Close the exporter sink
42+
*
43+
* @throws IOException IOException
44+
*/
45+
public void close() throws IOException {}
46+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
12+
13+
import java.util.List;
14+
15+
/**
16+
* Debug exporter for development purpose
17+
*/
18+
public final class DebugExporter extends AbstractExporter {
19+
/**
20+
* Constructor of DebugExporter
21+
*/
22+
public DebugExporter() {}
23+
24+
/**
25+
* Write the list of SearchQueryRecord to debug log
26+
*
27+
* @param records list of {@link SearchQueryRecord}
28+
* @return true
29+
*/
30+
@Override
31+
public boolean export(final List<SearchQueryRecord> records) {
32+
logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString());
33+
return true;
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.opensearch.action.bulk.BulkRequestBuilder;
12+
import org.opensearch.action.index.IndexRequest;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.client.Client;
15+
import org.opensearch.common.unit.TimeValue;
16+
import org.opensearch.common.xcontent.XContentFactory;
17+
import org.opensearch.core.xcontent.ToXContent;
18+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
19+
import org.joda.time.DateTime;
20+
import org.joda.time.DateTimeZone;
21+
import org.joda.time.format.DateTimeFormatter;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Local index exporter for exporting query insights data to local OpenSearch indices.
27+
*/
28+
public final class LocalIndexExporter extends AbstractExporter {
29+
final private Client client;
30+
final private DateTimeFormatter indexPattern;
31+
32+
/**
33+
* Constructor of LocalIndexExporter
34+
*
35+
* @param client OS client
36+
* @param indexPattern the pattern of index to export to
37+
*/
38+
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
39+
this.indexPattern = indexPattern;
40+
this.client = client;
41+
}
42+
43+
/**
44+
* Export a list of SearchQueryRecord to a local index
45+
*
46+
* @param records list of {@link SearchQueryRecord}
47+
* @return True if export succeed, false otherwise
48+
*/
49+
@Override
50+
public boolean export(final List<SearchQueryRecord> records) {
51+
if (records == null || records.size() == 0) {
52+
return true;
53+
}
54+
try {
55+
final String index = getDateTimeFromFormat();
56+
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk()
57+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
58+
.setTimeout(TimeValue.timeValueMinutes(1));
59+
for (SearchQueryRecord record : records) {
60+
bulkRequestBuilder.add(
61+
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
62+
);
63+
}
64+
bulkRequestBuilder.execute().actionGet();
65+
return true;
66+
} catch (final Exception e) {
67+
logger.error("Unable to index query insights data: ", e);
68+
return false;
69+
}
70+
}
71+
72+
private String getDateTimeFromFormat() {
73+
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.opensearch.client.Client;
12+
import org.opensearch.common.settings.Settings;
13+
import org.joda.time.format.DateTimeFormat;
14+
15+
import java.util.Locale;
16+
17+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
18+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
19+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
20+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
21+
22+
/**
23+
* Factory class for validating and creating exporters based on provided settings
24+
*/
25+
public class QueryInsightsExporterFactory {
26+
final private Client client;
27+
28+
/**
29+
* Constructor of QueryInsightsExporterFactory
30+
*
31+
* @param client OS client
32+
*/
33+
public QueryInsightsExporterFactory(final Client client) {
34+
this.client = client;
35+
}
36+
37+
/**
38+
* Validate exporter sink config
39+
*
40+
* @param settings exporter sink config {@link Settings}
41+
* @throws IllegalArgumentException if provided exporter sink config settings are invalid
42+
*/
43+
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
44+
// Disable exporter if the EXPORTER_TYPE setting is null
45+
if (settings.get(EXPORTER_TYPE) == null) {
46+
return;
47+
}
48+
SinkType type;
49+
try {
50+
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
51+
} catch (IllegalArgumentException e) {
52+
throw new IllegalArgumentException(String.format(Locale.ROOT, "Invalid exporter type [%s]", settings.get(EXPORTER_TYPE)));
53+
}
54+
switch (type) {
55+
case LOCAL_INDEX:
56+
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
57+
if (indexPattern.length() == 0) {
58+
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
59+
}
60+
try {
61+
DateTimeFormat.forPattern(indexPattern);
62+
} catch (Exception e) {
63+
throw new IllegalArgumentException(
64+
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
65+
);
66+
}
67+
}
68+
}
69+
70+
/**
71+
* Create an exporter based on provided parameters
72+
*
73+
* @param type The type of exporter to create
74+
* @param indexPattern the index pattern if creating a index exporter
75+
* @return AbstractExporter the created exporter sink
76+
*/
77+
public AbstractExporter createExporter(SinkType type, String indexPattern) {
78+
switch (type) {
79+
case LOCAL_INDEX:
80+
return new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern));
81+
default:
82+
return new DebugExporter();
83+
}
84+
}
85+
86+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import java.util.Locale;
12+
13+
/**
14+
* Type of supported sinks
15+
*/
16+
public enum SinkType {
17+
/** debug exporter */
18+
DEBUG,
19+
/** local index exporter */
20+
LOCAL_INDEX;
21+
22+
@Override
23+
public String toString() {
24+
return super.toString().toLowerCase(Locale.ROOT);
25+
}
26+
27+
/**
28+
* Parse SinkType from String
29+
* @param type the String representation of the SinkType
30+
* @return SinkType
31+
*/
32+
public static SinkType parse(final String type) {
33+
return valueOf(type.toUpperCase(Locale.ROOT));
34+
}
35+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Query Insights exporter
11+
*/
12+
package org.opensearch.plugin.insights.core.exporter;

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
package org.opensearch.plugin.insights.core.service;
1010

11+
import org.opensearch.client.Client;
1112
import org.opensearch.common.inject.Inject;
1213
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
14+
import org.opensearch.common.settings.ClusterSettings;
15+
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
1316
import org.opensearch.plugin.insights.rules.model.MetricType;
1417
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
1518
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
@@ -23,6 +26,8 @@
2326
import java.util.Map;
2427
import java.util.concurrent.LinkedBlockingQueue;
2528

29+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS;
30+
2631
/**
2732
* Service responsible for gathering, analyzing, storing and exporting
2833
* information related to search queries
@@ -59,18 +64,27 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
5964
/**
6065
* Constructor of the QueryInsightsService
6166
*
62-
* @param threadPool The OpenSearch thread pool to run async tasks
67+
* @param clusterSettings OpenSearch cluster level settings
68+
* @param threadPool The OpenSearch thread pool to run async tasks
69+
* @param client OS client
6370
*/
6471
@Inject
65-
public QueryInsightsService(final ThreadPool threadPool) {
72+
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
6673
enableCollect = new HashMap<>();
6774
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
75+
this.threadPool = threadPool;
76+
final QueryInsightsExporterFactory queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
77+
// initialize top n queries services and configurations consumers
6878
topQueriesServices = new HashMap<>();
6979
for (MetricType metricType : MetricType.allMetricTypes()) {
7080
enableCollect.put(metricType, false);
71-
topQueriesServices.put(metricType, new TopQueriesService(metricType));
81+
topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory));
7282
}
73-
this.threadPool = threadPool;
83+
clusterSettings.addSettingsUpdateConsumer(
84+
TOP_N_LATENCY_EXPORTER_SETTINGS,
85+
(settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)),
86+
(settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings))
87+
);
7488
}
7589

7690
/**

0 commit comments

Comments
 (0)