Skip to content

Commit f827460

Browse files
committed
add stats for text embedding processors with flags
Signed-off-by: will-hwang <[email protected]>
1 parent 33b399b commit f827460

File tree

5 files changed

+51
-1
lines changed

5 files changed

+51
-1
lines changed

src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public void doExecute(
7272
generateAndSetInference(ingestDocument, processMap, inferenceList, handler);
7373
return;
7474
}
75+
EventStatsManager.increment(EventStatName.TEXT_EMBEDDING_PROCESSOR_SKIP_EXISTING_EXECUTIONS);
7576
// if skipExisting flag is turned on, eligible inference texts will be compared and filtered after embeddings are copied
7677
Object index = ingestDocument.getSourceAndMetadata().get(INDEX_FIELD);
7778
Object id = ingestDocument.getSourceAndMetadata().get(ID_FIELD);

src/main/java/org/opensearch/neuralsearch/stats/events/EventStatName.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
@Getter
2020
public enum EventStatName implements StatName {
2121
TEXT_EMBEDDING_PROCESSOR_EXECUTIONS("text_embedding_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER),
22+
TEXT_EMBEDDING_PROCESSOR_SKIP_EXISTING_EXECUTIONS(
23+
"text_embedding_skip_existing_executions",
24+
"processors.ingest",
25+
EventStatType.TIMESTAMPED_EVENT_COUNTER
26+
),
2227
TEXT_CHUNKING_PROCESSOR_EXECUTIONS("text_chunking_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER),
2328
TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS(
2429
"text_chunking_fixed_length_executions",

src/main/java/org/opensearch/neuralsearch/stats/info/InfoStatName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public enum InfoStatName implements StatName {
2121
// Cluster info
2222
CLUSTER_VERSION("cluster_version", "", InfoStatType.INFO_STRING),
2323
TEXT_EMBEDDING_PROCESSORS("text_embedding_processors_in_pipelines", "processors.ingest", InfoStatType.INFO_COUNTER),
24+
TEXT_EMBEDDING_SKIP_EXISTING_PROCESSORS("text_embedding_skip_existing_processors", "processors.ingest", InfoStatType.INFO_COUNTER),
2425
TEXT_CHUNKING_PROCESSORS("text_chunking_processors", "processors.ingest", InfoStatType.INFO_COUNTER),
2526
TEXT_CHUNKING_DELIMITER_PROCESSORS("text_chunking_delimiter_processors", "processors.ingest", InfoStatType.INFO_COUNTER),
2627
TEXT_CHUNKING_FIXED_LENGTH_PROCESSORS("text_chunking_fixed_length_processors", "processors.ingest", InfoStatType.INFO_COUNTER);

src/main/java/org/opensearch/neuralsearch/stats/info/InfoStatsManager.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.HashMap;
1818
import java.util.List;
1919
import java.util.Map;
20+
import java.util.Objects;
2021
import java.util.stream.Collectors;
2122

2223
/**
@@ -130,7 +131,7 @@ private void addIngestProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot
130131
Map<String, Object> processorConfig = asMap(entry.getValue());
131132
switch (processorType) {
132133
case TextEmbeddingProcessor.TYPE:
133-
increment(stats, InfoStatName.TEXT_EMBEDDING_PROCESSORS);
134+
countTextEmbeddingProcessorStats(stats, processorConfig);
134135
break;
135136
case TextChunkingProcessor.TYPE:
136137
countTextChunkingProcessorStats(stats, processorConfig);
@@ -141,6 +142,19 @@ private void addIngestProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot
141142
}
142143
}
143144

145+
/**
146+
* Counts text embedding processor stats based on processor config
147+
* @param stats map containing the stat to increment
148+
* @param processorConfig map of the processor config, parsed to add stats
149+
*/
150+
private void countTextEmbeddingProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, Map<String, Object> processorConfig) {
151+
increment(stats, InfoStatName.TEXT_EMBEDDING_PROCESSORS);
152+
Object skipExisting = processorConfig.get(TextEmbeddingProcessor.SKIP_EXISTING);
153+
if (Objects.nonNull(skipExisting) && skipExisting.equals(Boolean.TRUE)) {
154+
increment(stats, InfoStatName.TEXT_EMBEDDING_SKIP_EXISTING_PROCESSORS);
155+
}
156+
}
157+
144158
/**
145159
* Counts text chunking processor stats based on processor config
146160
* @param stats map containing the stat to increment

src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.net.URL;
1010
import java.nio.file.Files;
1111
import java.nio.file.Path;
12+
import java.util.ArrayList;
1213
import java.util.Collections;
1314
import java.util.HashMap;
1415
import java.util.List;
@@ -31,6 +32,8 @@
3132

3233
import com.google.common.collect.ImmutableList;
3334
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;
35+
import org.opensearch.neuralsearch.stats.events.EventStatName;
36+
import org.opensearch.neuralsearch.stats.info.InfoStatName;
3437

3538
public class TextEmbeddingProcessorIT extends BaseNeuralSearchIT {
3639

@@ -478,4 +481,30 @@ public void testTextEmbeddingProcessorWithReindexOperation() throws Exception {
478481
reindex(fromIndexName, toIndexName);
479482
assertEquals(1, getDocCount(toIndexName));
480483
}
484+
485+
public void testTextEmbeddingProcessor_processorStats_successful() throws Exception {
486+
updateClusterSettings("plugins.neural_search.stats_enabled", true);
487+
String modelId = uploadTextEmbeddingModel();
488+
loadModel(modelId);
489+
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING_WITH_SKIP_EXISTING);
490+
createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME);
491+
ingestDocument(INDEX_NAME, INGEST_DOC1, "1");
492+
updateDocument(INDEX_NAME, UPDATE_DOC1, "1");
493+
assertEquals(1, getDocCount(INDEX_NAME));
494+
assertEquals(2, getDocById(INDEX_NAME, "1").get("_version"));
495+
// Get stats
496+
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
497+
Map<String, Object> stats = parseInfoStatsResponse(responseBody);
498+
Map<String, Object> allNodesStats = parseAggregatedNodeStatsResponse(responseBody);
499+
500+
// Parse json to get stats
501+
assertEquals(2, getNestedValue(allNodesStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
502+
assertEquals(2, getNestedValue(allNodesStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_SKIP_EXISTING_EXECUTIONS));
503+
504+
assertEquals(1, getNestedValue(stats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
505+
assertEquals(1, getNestedValue(stats, InfoStatName.TEXT_EMBEDDING_SKIP_EXISTING_PROCESSORS));
506+
// Reset stats
507+
updateClusterSettings("plugins.neural_search.stats_enabled", false);
508+
}
509+
481510
}

0 commit comments

Comments
 (0)