Skip to content

Commit 6347bf6

Browse files
q-andyYeonghyeonKO
authored andcommitted
Add neural stats API (opensearch-project#1256)
* Add neural stats API Signed-off-by: Andy Qin <[email protected]> Signed-off-by: yeonghyeonKo <[email protected]>
1 parent fa9ff0b commit 6347bf6

File tree

48 files changed

+4074
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4074
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Optimize embedding generation in Text/Image Embedding Processor ([#1249](https://github.com/opensearch-project/neural-search/pull/1249))
1616
- Inner hits support with hybrid query ([#1253](https://github.com/opensearch-project/neural-search/pull/1253))
1717
- Support custom tags in semantic highlighter ([#1254](https://github.com/opensearch-project/neural-search/pull/1254))
18+
- Add stats API ([#1256](https://github.com/opensearch-project/neural-search/pull/1256))
1819

1920
### Enhancements
2021

qa/restart-upgrade/build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ def versionsBelow2_13 = versionsBelow2_12 + "2.12"
6161
def versionsBelow2_14 = versionsBelow2_13 + "2.13"
6262
def versionsBelow2_15 = versionsBelow2_14 + "2.14"
6363
def versionsBelow2_16 = versionsBelow2_15 + "2.15"
64+
def versionsBelow2_17 = versionsBelow2_16 + "2.16"
65+
def versionsBelow2_18 = versionsBelow2_17 + "2.17"
66+
def versionsBelow2_19 = versionsBelow2_18 + "2.18"
67+
def versionsBelow2_20 = versionsBelow2_19 + "2.19"
68+
def versionsBelow3_0 = versionsBelow2_20 + "2.20"
6469

6570
// Task to run BWC tests against the old cluster
6671
task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
@@ -114,6 +119,13 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
114119
}
115120
}
116121

122+
// Excluding stats tests because we introduce this feature in 3.0
123+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
124+
filter {
125+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.restart.RestNeuralStatsActionIT.*"
126+
}
127+
}
128+
117129
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
118130
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
119131
systemProperty 'tests.security.manager', 'false'
@@ -179,6 +191,13 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) {
179191
}
180192
}
181193

194+
// Excluding stats tests because we introduce this feature in 3.0
195+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
196+
filter {
197+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.restart.RestNeuralStatsActionIT.*"
198+
}
199+
}
200+
182201
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
183202
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
184203
systemProperty 'tests.security.manager', 'false'
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.neuralsearch.bwc.restart;
6+
7+
import org.opensearch.neuralsearch.stats.events.EventStatName;
8+
import org.opensearch.neuralsearch.stats.info.InfoStatName;
9+
10+
import java.nio.file.Files;
11+
import java.nio.file.Path;
12+
import java.util.ArrayList;
13+
import java.util.Map;
14+
15+
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
16+
import static org.opensearch.neuralsearch.util.TestUtils.TEXT_EMBEDDING_PROCESSOR;
17+
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;
18+
19+
public class RestNeuralStatsActionIT extends AbstractRestartUpgradeRestTestCase {
20+
private static final String PIPELINE_NAME = "nlp-pipeline";
21+
private static final String TEST_FIELD = "passage_text";
22+
private static final String TEXT = "Hello world";
23+
private static final String TEXT_1 = "Hello world a";
24+
25+
// Test restart-upgrade with neural stats
26+
// Enabled/disabled settings should persist between restarts
27+
// Event stats should be reset on restart
28+
// Info stats based on persistent constructs should be persisted between restarts
29+
public void testNeuralStats_E2EFlow() throws Exception {
30+
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
31+
updateClusterSettings("plugins.neural_search.stats_enabled", true);
32+
33+
// Currently using text embedding processor executions stat since that's the only one implemented
34+
// Once other stats are implemented, it may be smarter to use those instead of text embedding processor
35+
// to avoid having to upload a model and run inference.
36+
if (isRunningAgainstOldCluster()) {
37+
String modelId = uploadTextEmbeddingModel();
38+
loadModel(modelId);
39+
createPipelineProcessor(modelId, PIPELINE_NAME);
40+
createIndexWithConfiguration(
41+
getIndexNameForTest(),
42+
Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())),
43+
PIPELINE_NAME
44+
);
45+
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
46+
47+
// Get stats request
48+
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
49+
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
50+
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
51+
52+
assertEquals(1, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
53+
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
54+
} else {
55+
String modelId = null;
56+
try {
57+
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
58+
loadModel(modelId);
59+
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, null, null);
60+
addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_1, null, null);
61+
addDocument(getIndexNameForTest(), "3", TEST_FIELD, TEXT_1, null, null);
62+
63+
// Get stats request
64+
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
65+
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
66+
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
67+
68+
assertEquals(3, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
69+
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
70+
} finally {
71+
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
72+
}
73+
}
74+
}
75+
}

qa/rolling-upgrade/build.gradle

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ def versionsBelow2_13 = versionsBelow2_12 + "2.12"
6161
def versionsBelow2_14 = versionsBelow2_13 + "2.13"
6262
def versionsBelow2_15 = versionsBelow2_14 + "2.14"
6363
def versionsBelow2_16 = versionsBelow2_15 + "2.15"
64+
def versionsBelow2_17 = versionsBelow2_16 + "2.16"
65+
def versionsBelow2_18 = versionsBelow2_17 + "2.17"
66+
def versionsBelow2_19 = versionsBelow2_18 + "2.18"
67+
def versionsBelow2_20 = versionsBelow2_19 + "2.19"
68+
def versionsBelow3_0 = versionsBelow2_20 + "2.20"
6469

6570
// Task to run BWC tests against the old cluster
6671
task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
@@ -75,6 +80,13 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
7580
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
7681
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
7782
systemProperty 'tests.security.manager', 'false'
83+
84+
// Excluding stats tests because we introduce this feature in 3.0
85+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
86+
filter {
87+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
88+
}
89+
}
7890
}
7991

8092
// Part of rolling upgrade. Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version
@@ -100,6 +112,13 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) {
100112
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
101113
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
102114
systemProperty 'tests.security.manager', 'false'
115+
116+
// Excluding stats tests because we introduce this feature in 3.0
117+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
118+
filter {
119+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
120+
}
121+
}
103122
}
104123

105124
// Part of rolling upgrade. Upgrades the second node to new OpenSearch version with upgraded plugin version after the
@@ -124,6 +143,13 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) {
124143
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
125144
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
126145
systemProperty 'tests.security.manager', 'false'
146+
147+
// Excluding stats tests because we introduce this feature in 3.0
148+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
149+
filter {
150+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
151+
}
152+
}
127153
}
128154

129155
// Part of rolling upgrade. Upgrades the third node to new OpenSearch version with upgraded plugin version after the
@@ -148,4 +174,11 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) {
148174
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
149175
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
150176
systemProperty 'tests.security.manager', 'false'
177+
178+
// Excluding stats tests because we introduce this feature in 3.0
179+
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
180+
filter {
181+
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
182+
}
183+
}
151184
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.neuralsearch.bwc.rolling;
6+
7+
import org.opensearch.neuralsearch.stats.events.EventStatName;
8+
import org.opensearch.neuralsearch.stats.info.InfoStatName;
9+
10+
import java.nio.file.Files;
11+
import java.nio.file.Path;
12+
import java.util.ArrayList;
13+
import java.util.Map;
14+
15+
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
16+
17+
public class RestNeuralStatsActionIT extends AbstractRollingUpgradeTestCase {
18+
private static final String PIPELINE_NAME = "nlp-pipeline";
19+
private static final String TEST_FIELD = "passage_text";
20+
private static final String TEXT = "Hello world";
21+
private static final String TEXT_MIXED = "Hello world mixed";
22+
private static final String TEXT_UPGRADED = "Hello world upgraded";
23+
private static final int NUM_DOCS_PER_ROUND = 1;
24+
private static String modelId = "";
25+
26+
// Test rolling-upgrade neural stats action
27+
// Create Text Embedding Processor, Ingestion Pipeline and add document
28+
// Validate stats are correct during upgrade
29+
// When new stats are added, we will also want to validate handling fetching stats from previous versions
30+
// that don't have those stats.
31+
public void testStats_E2EFlow() throws Exception {
32+
waitForClusterHealthGreen(NODES_BWC_CLUSTER, 90);
33+
updateClusterSettings("plugins.neural_search.stats_enabled", true);
34+
35+
switch (getClusterType()) {
36+
case OLD:
37+
modelId = uploadTextEmbeddingModel();
38+
loadModel(modelId);
39+
createPipelineProcessor(modelId, PIPELINE_NAME);
40+
createIndexWithConfiguration(
41+
getIndexNameForTest(),
42+
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
43+
PIPELINE_NAME
44+
);
45+
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
46+
47+
// Get stats request
48+
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
49+
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
50+
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
51+
52+
assertEquals(1, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
53+
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
54+
break;
55+
case MIXED:
56+
// Get stats request
57+
responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
58+
infoStats = parseInfoStatsResponse(responseBody);
59+
60+
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
61+
break;
62+
case UPGRADED:
63+
try {
64+
// Get stats request
65+
responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
66+
infoStats = parseInfoStatsResponse(responseBody);
67+
aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
68+
69+
// After all nodes have be restarted, all event stats should be reset as well
70+
assertEquals(0, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
71+
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
72+
} finally {
73+
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
74+
}
75+
break;
76+
default:
77+
throw new IllegalStateException("Unexpected value: " + getClusterType());
78+
}
79+
}
80+
}

src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_SEARCH_HYBRID_SEARCH_DISABLED;
88
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.RERANKER_MAX_DOC_FIELDS;
9+
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_STATS_ENABLED;
910

1011
import java.util.Arrays;
1112
import java.util.Collection;
@@ -19,12 +20,22 @@
1920
import org.opensearch.neuralsearch.highlight.SemanticHighlighter;
2021
import org.opensearch.neuralsearch.highlight.SemanticHighlighterEngine;
2122
import org.opensearch.neuralsearch.highlight.extractor.QueryTextExtractorRegistry;
23+
import com.google.common.collect.ImmutableList;
24+
import org.opensearch.action.ActionRequest;
25+
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
26+
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
27+
import org.opensearch.neuralsearch.stats.info.InfoStatsManager;
2228
import org.opensearch.transport.client.Client;
2329
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
30+
import org.opensearch.cluster.node.DiscoveryNodes;
2431
import org.opensearch.cluster.service.ClusterService;
32+
import org.opensearch.common.settings.ClusterSettings;
33+
import org.opensearch.common.settings.IndexScopedSettings;
2534
import org.opensearch.common.settings.Setting;
2635
import org.opensearch.common.settings.Settings;
36+
import org.opensearch.common.settings.SettingsFilter;
2737
import org.opensearch.common.util.FeatureFlags;
38+
import org.opensearch.core.action.ActionResponse;
2839
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2940
import org.opensearch.core.xcontent.NamedXContentRegistry;
3041
import org.opensearch.env.Environment;
@@ -59,15 +70,21 @@
5970
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;
6071
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
6172
import org.opensearch.neuralsearch.query.ext.RerankSearchExtBuilder;
73+
import org.opensearch.neuralsearch.rest.RestNeuralStatsAction;
6274
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
75+
import org.opensearch.neuralsearch.transport.NeuralStatsAction;
76+
import org.opensearch.neuralsearch.transport.NeuralStatsTransportAction;
6377
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
78+
import org.opensearch.neuralsearch.util.PipelineServiceUtil;
6479
import org.opensearch.plugins.ActionPlugin;
6580
import org.opensearch.plugins.ExtensiblePlugin;
6681
import org.opensearch.plugins.IngestPlugin;
6782
import org.opensearch.plugins.Plugin;
6883
import org.opensearch.plugins.SearchPipelinePlugin;
6984
import org.opensearch.plugins.SearchPlugin;
7085
import org.opensearch.repositories.RepositoriesService;
86+
import org.opensearch.rest.RestController;
87+
import org.opensearch.rest.RestHandler;
7188
import org.opensearch.script.ScriptService;
7289
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
7390
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
@@ -87,10 +104,14 @@
87104
public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin, IngestPlugin, ExtensiblePlugin, SearchPipelinePlugin {
88105
private MLCommonsClientAccessor clientAccessor;
89106
private NormalizationProcessorWorkflow normalizationProcessorWorkflow;
107+
private NeuralSearchSettingsAccessor settingsAccessor;
108+
private PipelineServiceUtil pipelineServiceUtil;
109+
private InfoStatsManager infoStatsManager;
90110
private final ScoreNormalizationFactory scoreNormalizationFactory = new ScoreNormalizationFactory();
91111
private final ScoreCombinationFactory scoreCombinationFactory = new ScoreCombinationFactory();
92112
private final SemanticHighlighter semanticHighlighter;
93113
public static final String EXPLANATION_RESPONSE_KEY = "explanation_response";
114+
public static final String NEURAL_BASE_URI = "/_plugins/_neural";
94115

95116
public NeuralSearch() {
96117
this.semanticHighlighter = new SemanticHighlighter();
@@ -121,7 +142,11 @@ public Collection<Object> createComponents(
121142
semanticHighlighter.initialize(semanticHighlighterEngine);
122143
HybridQueryExecutor.initialize(threadPool);
123144
normalizationProcessorWorkflow = new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner());
124-
return List.of(clientAccessor);
145+
settingsAccessor = new NeuralSearchSettingsAccessor(clusterService, environment.settings());
146+
pipelineServiceUtil = new PipelineServiceUtil(clusterService);
147+
infoStatsManager = new InfoStatsManager(NeuralSearchClusterUtil.instance(), settingsAccessor, pipelineServiceUtil);
148+
EventStatsManager.instance().initialize(settingsAccessor);
149+
return List.of(clientAccessor, EventStatsManager.instance(), infoStatsManager);
125150
}
126151

127152
@Override
@@ -133,6 +158,25 @@ public List<QuerySpec<?>> getQueries() {
133158
);
134159
}
135160

161+
@Override
162+
public List<RestHandler> getRestHandlers(
163+
Settings settings,
164+
RestController restController,
165+
ClusterSettings clusterSettings,
166+
IndexScopedSettings indexScopedSettings,
167+
SettingsFilter settingsFilter,
168+
IndexNameExpressionResolver indexNameExpressionResolver,
169+
Supplier<DiscoveryNodes> nodesInCluster
170+
) {
171+
RestNeuralStatsAction restNeuralStatsAction = new RestNeuralStatsAction(settingsAccessor);
172+
return ImmutableList.of(restNeuralStatsAction);
173+
}
174+
175+
@Override
176+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
177+
return Arrays.asList(new ActionHandler<>(NeuralStatsAction.INSTANCE, NeuralStatsTransportAction.class));
178+
}
179+
136180
@Override
137181
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
138182
return List.of(HybridQueryExecutor.getExecutorBuilder(settings));
@@ -198,7 +242,7 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchPhaseR
198242

199243
@Override
200244
public List<Setting<?>> getSettings() {
201-
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS);
245+
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS, NEURAL_STATS_ENABLED);
202246
}
203247

204248
@Override

0 commit comments

Comments
 (0)