Skip to content

Commit 0ddf4bd

Browse files
authored
Query insights exporters implementation (opensearch-project#12982)
--------- Signed-off-by: Chenyang Ji <[email protected]>
1 parent 1cded65 commit 0ddf4bd

18 files changed

+845
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
1919
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
2020
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
21+
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
2122

2223
### Dependencies
2324
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Collection<Object> createComponents(
7070
final Supplier<RepositoriesService> repositoriesServiceSupplier
7171
) {
7272
// create top n queries service
73-
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
73+
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
7474
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
7575
}
7676

@@ -110,7 +110,8 @@ public List<Setting<?>> getSettings() {
110110
// Settings for top N queries
111111
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
112112
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
113-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
113+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
114+
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
114115
);
115116
}
116117
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
14+
15+
import java.util.List;
16+
17+
/**
18+
* Debug exporter for development purpose
19+
*/
20+
public final class DebugExporter implements QueryInsightsExporter {
21+
/**
22+
* Logger of the debug exporter
23+
*/
24+
private final Logger logger = LogManager.getLogger();
25+
26+
/**
27+
* Constructor of DebugExporter
28+
*/
29+
private DebugExporter() {}
30+
31+
private static class InstanceHolder {
32+
private static final DebugExporter INSTANCE = new DebugExporter();
33+
}
34+
35+
/**
36+
Get the singleton instance of DebugExporter
37+
*
38+
@return DebugExporter instance
39+
*/
40+
public static DebugExporter getInstance() {
41+
return InstanceHolder.INSTANCE;
42+
}
43+
44+
/**
45+
* Write the list of SearchQueryRecord to debug log
46+
*
47+
* @param records list of {@link SearchQueryRecord}
48+
*/
49+
@Override
50+
public void export(final List<SearchQueryRecord> records) {
51+
logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString());
52+
}
53+
54+
/**
55+
* Close the debugger exporter sink
56+
*/
57+
@Override
58+
public void close() {
59+
logger.debug("Closing the DebugExporter..");
60+
}
61+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.bulk.BulkRequestBuilder;
14+
import org.opensearch.action.bulk.BulkResponse;
15+
import org.opensearch.action.index.IndexRequest;
16+
import org.opensearch.client.Client;
17+
import org.opensearch.common.unit.TimeValue;
18+
import org.opensearch.common.xcontent.XContentFactory;
19+
import org.opensearch.core.action.ActionListener;
20+
import org.opensearch.core.xcontent.ToXContent;
21+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
22+
import org.joda.time.DateTime;
23+
import org.joda.time.DateTimeZone;
24+
import org.joda.time.format.DateTimeFormatter;
25+
26+
import java.util.List;
27+
28+
/**
29+
* Local index exporter for exporting query insights data to local OpenSearch indices.
30+
*/
31+
public final class LocalIndexExporter implements QueryInsightsExporter {
32+
/**
33+
* Logger of the local index exporter
34+
*/
35+
private final Logger logger = LogManager.getLogger();
36+
private final Client client;
37+
private DateTimeFormatter indexPattern;
38+
39+
/**
40+
* Constructor of LocalIndexExporter
41+
*
42+
* @param client OS client
43+
* @param indexPattern the pattern of index to export to
44+
*/
45+
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
46+
this.indexPattern = indexPattern;
47+
this.client = client;
48+
}
49+
50+
/**
51+
* Getter of indexPattern
52+
*
53+
* @return indexPattern
54+
*/
55+
public DateTimeFormatter getIndexPattern() {
56+
return indexPattern;
57+
}
58+
59+
/**
60+
* Setter of indexPattern
61+
*
62+
* @param indexPattern index pattern
63+
* @return the current LocalIndexExporter
64+
*/
65+
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
66+
this.indexPattern = indexPattern;
67+
return this;
68+
}
69+
70+
/**
71+
* Export a list of SearchQueryRecord to a local index
72+
*
73+
* @param records list of {@link SearchQueryRecord}
74+
*/
75+
@Override
76+
public void export(final List<SearchQueryRecord> records) {
77+
if (records == null || records.size() == 0) {
78+
return;
79+
}
80+
try {
81+
final String index = getDateTimeFromFormat();
82+
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
83+
for (SearchQueryRecord record : records) {
84+
bulkRequestBuilder.add(
85+
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
86+
);
87+
}
88+
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
89+
@Override
90+
public void onResponse(BulkResponse bulkItemResponses) {}
91+
92+
@Override
93+
public void onFailure(Exception e) {
94+
logger.error("Failed to execute bulk operation for query insights data: ", e);
95+
}
96+
});
97+
} catch (final Exception e) {
98+
logger.error("Unable to index query insights data: ", e);
99+
}
100+
}
101+
102+
/**
103+
* Close the exporter sink
104+
*/
105+
@Override
106+
public void close() {
107+
logger.debug("Closing the LocalIndexExporter..");
108+
}
109+
110+
private String getDateTimeFromFormat() {
111+
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
112+
}
113+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
12+
13+
import java.io.Closeable;
14+
import java.util.List;
15+
16+
/**
17+
* Base interface for Query Insights exporters
18+
*/
19+
public interface QueryInsightsExporter extends Closeable {
20+
/**
21+
* Export a list of SearchQueryRecord to the exporter sink
22+
*
23+
* @param records list of {@link SearchQueryRecord}
24+
*/
25+
void export(final List<SearchQueryRecord> records);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.exporter;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.client.Client;
14+
import org.opensearch.common.settings.Settings;
15+
import org.joda.time.format.DateTimeFormat;
16+
17+
import java.io.IOException;
18+
import java.util.HashSet;
19+
import java.util.Locale;
20+
import java.util.Set;
21+
22+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
23+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
24+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
25+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
26+
27+
/**
28+
* Factory class for validating and creating exporters based on provided settings
29+
*/
30+
public class QueryInsightsExporterFactory {
31+
/**
32+
* Logger of the query insights exporter factory
33+
*/
34+
private final Logger logger = LogManager.getLogger();
35+
final private Client client;
36+
final private Set<QueryInsightsExporter> exporters;
37+
38+
/**
39+
* Constructor of QueryInsightsExporterFactory
40+
*
41+
* @param client OS client
42+
*/
43+
public QueryInsightsExporterFactory(final Client client) {
44+
this.client = client;
45+
this.exporters = new HashSet<>();
46+
}
47+
48+
/**
49+
* Validate exporter sink config
50+
*
51+
* @param settings exporter sink config {@link Settings}
52+
* @throws IllegalArgumentException if provided exporter sink config settings are invalid
53+
*/
54+
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
55+
// Disable exporter if the EXPORTER_TYPE setting is null
56+
if (settings.get(EXPORTER_TYPE) == null) {
57+
return;
58+
}
59+
SinkType type;
60+
try {
61+
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
62+
} catch (IllegalArgumentException e) {
63+
throw new IllegalArgumentException(
64+
String.format(
65+
Locale.ROOT,
66+
"Invalid exporter type [%s], type should be one of %s",
67+
settings.get(EXPORTER_TYPE),
68+
SinkType.allSinkTypes()
69+
)
70+
);
71+
}
72+
switch (type) {
73+
case LOCAL_INDEX:
74+
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
75+
if (indexPattern.length() == 0) {
76+
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
77+
}
78+
try {
79+
DateTimeFormat.forPattern(indexPattern);
80+
} catch (Exception e) {
81+
throw new IllegalArgumentException(
82+
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
83+
);
84+
}
85+
}
86+
}
87+
88+
/**
89+
* Create an exporter based on provided parameters
90+
*
91+
* @param type The type of exporter to create
92+
* @param indexPattern the index pattern if creating a index exporter
93+
* @return QueryInsightsExporter the created exporter sink
94+
*/
95+
public QueryInsightsExporter createExporter(SinkType type, String indexPattern) {
96+
if (SinkType.LOCAL_INDEX.equals(type)) {
97+
QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern));
98+
this.exporters.add(exporter);
99+
return exporter;
100+
}
101+
return DebugExporter.getInstance();
102+
}
103+
104+
/**
105+
* Update an exporter based on provided parameters
106+
*
107+
* @param exporter The exporter to update
108+
* @param indexPattern the index pattern if creating a index exporter
109+
* @return QueryInsightsExporter the updated exporter sink
110+
*/
111+
public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) {
112+
if (exporter.getClass() == LocalIndexExporter.class) {
113+
((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
114+
}
115+
return exporter;
116+
}
117+
118+
/**
119+
* Close an exporter
120+
*
121+
* @param exporter the exporter to close
122+
*/
123+
public void closeExporter(QueryInsightsExporter exporter) throws IOException {
124+
if (exporter != null) {
125+
exporter.close();
126+
this.exporters.remove(exporter);
127+
}
128+
}
129+
130+
/**
131+
* Close all exporters
132+
*
133+
*/
134+
public void closeAllExporters() {
135+
for (QueryInsightsExporter exporter : exporters) {
136+
try {
137+
closeExporter(exporter);
138+
} catch (IOException e) {
139+
logger.error("Fail to close query insights exporter, error: ", e);
140+
}
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)