Skip to content

Commit fc77246

Browse files
q-andyYeonghyeonKO
authored andcommitted
Add TextChunkingProcessor stats (opensearch-project#1308)
* Add TextChunkingProcessor stats Signed-off-by: Andy Qin <[email protected]> # Conflicts: # CHANGELOG.md * Update unit and integ tests Signed-off-by: Andy Qin <[email protected]> --------- Signed-off-by: Andy Qin <[email protected]> Signed-off-by: yeonghyeonKo <[email protected]>
1 parent 5ad2a14 commit fc77246

File tree

13 files changed

+219
-12
lines changed

13 files changed

+219
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99

1010
### Enhancements
1111
- [Performance Improvement] Add custom bulk scorer for hybrid query (2-3x faster) ([#1289](https://github.com/opensearch-project/neural-search/pull/1289))
12+
- [Stats] Add stats for text chunking processor algorithms ([#1308](https://github.com/opensearch-project/neural-search/pull/1308))
1213

1314
### Bug Fixes
1415

src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import org.opensearch.neuralsearch.processor.chunker.Chunker;
2424
import org.opensearch.index.mapper.IndexFieldMapper;
2525
import org.opensearch.neuralsearch.processor.chunker.ChunkerFactory;
26+
import org.opensearch.neuralsearch.processor.chunker.DelimiterChunker;
2627
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
28+
import org.opensearch.neuralsearch.stats.events.EventStatName;
29+
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
2730
import org.opensearch.neuralsearch.util.ProcessorDocumentUtils;
2831

2932
import static org.opensearch.neuralsearch.processor.chunker.Chunker.MAX_CHUNK_LIMIT_FIELD;
@@ -192,6 +195,7 @@ public IngestDocument execute(final IngestDocument ingestDocument) {
192195
runtimeParameters.put(MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
193196
runtimeParameters.put(CHUNK_STRING_COUNT_FIELD, chunkStringCount);
194197
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters);
198+
recordChunkingExecutionStats(chunker.getAlgorithmName());
195199
return ingestDocument;
196200
}
197201

@@ -316,4 +320,12 @@ private List<String> chunkLeafType(final Object value, final Map<String, Object>
316320
}
317321
return result;
318322
}
323+
324+
private void recordChunkingExecutionStats(String algorithmName) {
325+
EventStatsManager.increment(EventStatName.TEXT_CHUNKING_PROCESSOR_EXECUTIONS);
326+
switch (algorithmName) {
327+
case DelimiterChunker.ALGORITHM_NAME -> EventStatsManager.increment(EventStatName.TEXT_CHUNKING_DELIMITER_EXECUTIONS);
328+
case FixedTokenLengthChunker.ALGORITHM_NAME -> EventStatsManager.increment(EventStatName.TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS);
329+
}
330+
}
319331
}

src/main/java/org/opensearch/neuralsearch/processor/chunker/Chunker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,6 @@ public interface Chunker {
5454
static boolean checkRunTimeMaxChunkLimit(int chunkResultSize, int runtimeMaxChunkLimit, int chunkStringCount) {
5555
return runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT && chunkResultSize + chunkStringCount >= runtimeMaxChunkLimit;
5656
}
57+
58+
String getAlgorithmName();
5759
}

src/main/java/org/opensearch/neuralsearch/processor/chunker/DelimiterChunker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,9 @@ public List<String> chunk(final String content, final Map<String, Object> runtim
8383

8484
return chunkResult;
8585
}
86+
87+
@Override
88+
public String getAlgorithmName() {
89+
return ALGORITHM_NAME;
90+
}
8691
}

src/main/java/org/opensearch/neuralsearch/processor/chunker/FixedTokenLengthChunker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,9 @@ private List<AnalyzeToken> tokenize(final String content, final String tokenizer
180180
throw new IllegalStateException(String.format(Locale.ROOT, "analyzer %s throws exception: %s", tokenizer, e.getMessage()), e);
181181
}
182182
}
183+
184+
@Override
185+
public String getAlgorithmName() {
186+
return ALGORITHM_NAME;
187+
}
183188
}

src/main/java/org/opensearch/neuralsearch/stats/events/EventStatName.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818
*/
1919
@Getter
2020
public enum EventStatName implements StatName {
21-
TEXT_EMBEDDING_PROCESSOR_EXECUTIONS("text_embedding_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER);
21+
TEXT_EMBEDDING_PROCESSOR_EXECUTIONS("text_embedding_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER),
22+
TEXT_CHUNKING_PROCESSOR_EXECUTIONS("text_chunking_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER),
23+
TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS(
24+
"text_chunking_fixed_length_executions",
25+
"processors.ingest",
26+
EventStatType.TIMESTAMPED_EVENT_COUNTER
27+
),
28+
TEXT_CHUNKING_DELIMITER_EXECUTIONS("text_chunking_delimiter_executions", "processors.ingest", EventStatType.TIMESTAMPED_EVENT_COUNTER);
2229

2330
private final String nameString;
2431
private final String path;

src/main/java/org/opensearch/neuralsearch/stats/info/InfoStatName.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
public enum InfoStatName implements StatName {
2121
// Cluster info
2222
CLUSTER_VERSION("cluster_version", "", InfoStatType.INFO_STRING),
23-
TEXT_EMBEDDING_PROCESSORS("text_embedding_processors_in_pipelines", "processors.ingest", InfoStatType.INFO_COUNTER);
23+
TEXT_EMBEDDING_PROCESSORS("text_embedding_processors_in_pipelines", "processors.ingest", InfoStatType.INFO_COUNTER),
24+
TEXT_CHUNKING_PROCESSORS("text_chunking_processors", "processors.ingest", InfoStatType.INFO_COUNTER),
25+
TEXT_CHUNKING_DELIMITER_PROCESSORS("text_chunking_delimiter_processors", "processors.ingest", InfoStatType.INFO_COUNTER),
26+
TEXT_CHUNKING_FIXED_LENGTH_PROCESSORS("text_chunking_fixed_length_processors", "processors.ingest", InfoStatType.INFO_COUNTER);
2427

2528
private final String nameString;
2629
private final String path;

src/main/java/org/opensearch/neuralsearch/stats/info/InfoStatsManager.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
*/
55
package org.opensearch.neuralsearch.stats.info;
66

7+
import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
78
import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor;
9+
import org.opensearch.neuralsearch.processor.chunker.DelimiterChunker;
10+
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
811
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
912
import org.opensearch.neuralsearch.stats.common.StatSnapshot;
1013
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
@@ -118,6 +121,7 @@ private void addClusterVersionStat(Map<InfoStatName, SettableInfoStatSnapshot<?>
118121
private void addIngestProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats) {
119122
List<Map<String, Object>> pipelineConfigs = pipelineServiceUtil.getIngestPipelineConfigs();
120123

124+
// Iterate through all ingest processors and count their stats individually by calling helpers
121125
for (Map<String, Object> pipelineConfig : pipelineConfigs) {
122126
List<Map<String, Object>> ingestProcessors = asListOfMaps(pipelineConfig.get(PROCESSORS_KEY));
123127
for (Map<String, Object> ingestProcessor : ingestProcessors) {
@@ -128,12 +132,36 @@ private void addIngestProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot
128132
case TextEmbeddingProcessor.TYPE:
129133
increment(stats, InfoStatName.TEXT_EMBEDDING_PROCESSORS);
130134
break;
135+
case TextChunkingProcessor.TYPE:
136+
countTextChunkingProcessorStats(stats, processorConfig);
137+
break;
131138
}
132139
}
133140
}
134141
}
135142
}
136143

144+
/**
145+
* Counts text chunking processor stats based on processor config
146+
* @param stats map containing the stat to increment
147+
* @param processorConfig map of the processor config, parsed to add stats
148+
*/
149+
private void countTextChunkingProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, Map<String, Object> processorConfig) {
150+
increment(stats, InfoStatName.TEXT_CHUNKING_PROCESSORS);
151+
152+
Map<String, Object> algorithmMap = asMap(processorConfig.get(TextChunkingProcessor.ALGORITHM_FIELD));
153+
154+
Map.Entry<String, Object> algorithmEntry = algorithmMap.entrySet().iterator().next();
155+
String algorithmKey = algorithmEntry.getKey();
156+
157+
switch (algorithmKey) {
158+
case DelimiterChunker.ALGORITHM_NAME -> increment(stats, InfoStatName.TEXT_CHUNKING_DELIMITER_PROCESSORS);
159+
case FixedTokenLengthChunker.ALGORITHM_NAME -> increment(stats, InfoStatName.TEXT_CHUNKING_FIXED_LENGTH_PROCESSORS);
160+
// If no algorithm is specified, the default is fixed length
161+
default -> increment(stats, InfoStatName.TEXT_CHUNKING_FIXED_LENGTH_PROCESSORS);
162+
}
163+
}
164+
137165
/**
138166
* Increments a countable info stat in the given stat name
139167
* @param stats map containing the stat to increment

src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
import org.opensearch.index.query.MatchAllQueryBuilder;
1919
import org.opensearch.neuralsearch.BaseNeuralSearchIT;
20+
import org.opensearch.neuralsearch.stats.events.EventStatName;
21+
import org.opensearch.neuralsearch.stats.info.InfoStatName;
2022

2123
public class TextChunkingProcessorIT extends BaseNeuralSearchIT {
2224
private static final String INDEX_NAME = "text_chunking_test_index";
25+
private static final String INDEX_NAME2 = "text_chunking_test_index_2nd";
2326

2427
private static final String OUTPUT_FIELD = "body_chunk";
2528

@@ -167,8 +170,57 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken
167170
assertEquals(1, getDocCount(INDEX_NAME));
168171
}
169172

170-
private void validateIndexIngestResults(String indexName, String fieldName, Object expected) {
171-
assertEquals(1, getDocCount(indexName));
173+
@SneakyThrows
174+
public void testTextChunkingProcessor_processorStats_successful() {
175+
updateClusterSettings("plugins.neural_search.stats_enabled", true);
176+
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
177+
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
178+
179+
// Creating an extra fixed length pipeline
180+
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME);
181+
182+
createPipelineProcessor(DELIMITER_PIPELINE_NAME);
183+
createTextChunkingIndex(INDEX_NAME2, DELIMITER_PIPELINE_NAME);
184+
185+
String document = getDocumentFromFilePath(TEST_DOCUMENT);
186+
ingestDocument(INDEX_NAME, document);
187+
ingestDocument(INDEX_NAME, document);
188+
189+
List<String> expectedPassages = new ArrayList<>();
190+
expectedPassages.add("This is an example document to be chunked. The document ");
191+
expectedPassages.add("contains a single paragraph, two sentences and 24 tokens by ");
192+
expectedPassages.add("standard tokenizer in OpenSearch.");
193+
validateIndexIngestResultsWithMultipleDocs(INDEX_NAME, OUTPUT_FIELD, expectedPassages, 2);
194+
195+
ingestDocument(INDEX_NAME2, document);
196+
ingestDocument(INDEX_NAME2, document);
197+
ingestDocument(INDEX_NAME2, document);
198+
199+
expectedPassages = new ArrayList<>();
200+
expectedPassages.add("This is an example document to be chunked.");
201+
expectedPassages.add(" The document contains a single paragraph, two sentences and 24 tokens by standard tokenizer in OpenSearch.");
202+
validateIndexIngestResultsWithMultipleDocs(INDEX_NAME2, OUTPUT_FIELD, expectedPassages, 3);
203+
204+
// Get stats
205+
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
206+
Map<String, Object> stats = parseInfoStatsResponse(responseBody);
207+
Map<String, Object> allNodesStats = parseAggregatedNodeStatsResponse(responseBody);
208+
209+
// Parse json to get stats
210+
assertEquals(5, getNestedValue(allNodesStats, EventStatName.TEXT_CHUNKING_PROCESSOR_EXECUTIONS));
211+
assertEquals(3, getNestedValue(allNodesStats, EventStatName.TEXT_CHUNKING_DELIMITER_EXECUTIONS));
212+
assertEquals(2, getNestedValue(allNodesStats, EventStatName.TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS));
213+
214+
assertEquals(3, getNestedValue(stats, InfoStatName.TEXT_CHUNKING_PROCESSORS));
215+
assertEquals(1, getNestedValue(stats, InfoStatName.TEXT_CHUNKING_DELIMITER_PROCESSORS));
216+
assertEquals(2, getNestedValue(stats, InfoStatName.TEXT_CHUNKING_FIXED_LENGTH_PROCESSORS));
217+
218+
// Reset stats
219+
updateClusterSettings("plugins.neural_search.stats_enabled", false);
220+
}
221+
222+
private void validateIndexIngestResultsWithMultipleDocs(String indexName, String fieldName, Object expected, int docCount) {
223+
assertEquals(docCount, getDocCount(indexName));
172224
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
173225
Map<String, Object> searchResults = search(indexName, query, 10);
174226
assertNotNull(searchResults);
@@ -183,6 +235,10 @@ private void validateIndexIngestResults(String indexName, String fieldName, Obje
183235
assertEquals(expected, ingestOutputs);
184236
}
185237

238+
private void validateIndexIngestResults(String indexName, String fieldName, Object expected) {
239+
validateIndexIngestResultsWithMultipleDocs(indexName, fieldName, expected, 1);
240+
}
241+
186242
private void createPipelineProcessor(String pipelineName) throws Exception {
187243
URL pipelineURLPath = classLoader.getResource(PIPELINE_CONFIGS_BY_NAME.get(pipelineName));
188244
Objects.requireNonNull(pipelineURLPath);

src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorTests.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.lucene.tests.analysis.MockTokenizer;
1010
import org.junit.Before;
1111
import java.util.ArrayList;
12+
import java.util.EnumSet;
1213
import java.util.HashMap;
1314
import java.util.List;
1415
import java.util.Locale;
@@ -37,6 +38,11 @@
3738
import org.opensearch.neuralsearch.processor.chunker.DelimiterChunker;
3839
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
3940
import org.opensearch.neuralsearch.processor.factory.TextChunkingProcessorFactory;
41+
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
42+
import org.opensearch.neuralsearch.stats.events.EventStatName;
43+
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
44+
import org.opensearch.neuralsearch.stats.events.TimestampedEventStatSnapshot;
45+
import org.opensearch.neuralsearch.util.TestUtils;
4046
import org.opensearch.plugins.AnalysisPlugin;
4147
import org.opensearch.test.OpenSearchTestCase;
4248
import static org.opensearch.neuralsearch.processor.TextChunkingProcessor.TYPE;
@@ -92,6 +98,8 @@ public void setup() {
9298
when(clusterState.metadata()).thenReturn(metadata);
9399
when(clusterService.state()).thenReturn(clusterState);
94100
textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, getAnalysisRegistry());
101+
102+
TestUtils.initializeEventStatsManager();
95103
}
96104

97105
private Map<String, Object> createFixedTokenLengthParameters() {
@@ -972,4 +980,70 @@ public void testExecute_withIgnoreMissing_thenSucceed() {
972980
IngestDocument document = processor.execute(ingestDocument);
973981
assertFalse(document.getSourceAndMetadata().containsKey(OUTPUT_FIELD));
974982
}
983+
984+
@SneakyThrows
985+
public void testExecute_statsDisabled_thenSucceed() {
986+
NeuralSearchSettingsAccessor settingsAccessor = mock(NeuralSearchSettingsAccessor.class);
987+
when(settingsAccessor.isStatsEnabled()).thenReturn(false);
988+
EventStatsManager.instance().initialize(settingsAccessor);
989+
990+
TextChunkingProcessor processor = createFixedTokenLengthInstance(createStringFieldMap());
991+
IngestDocument ingestDocument = createIngestDocumentWithSourceData(createSourceDataString());
992+
IngestDocument document = processor.execute(ingestDocument);
993+
assert document.getSourceAndMetadata().containsKey(OUTPUT_FIELD);
994+
Object passages = document.getSourceAndMetadata().get(OUTPUT_FIELD);
995+
assert (passages instanceof List<?>);
996+
List<String> expectedPassages = new ArrayList<>();
997+
expectedPassages.add("This is an example document to be chunked. The document ");
998+
expectedPassages.add("contains a single paragraph, two sentences and 24 tokens by ");
999+
expectedPassages.add("standard tokenizer in OpenSearch.");
1000+
assertEquals(expectedPassages, passages);
1001+
1002+
Map<EventStatName, TimestampedEventStatSnapshot> snapshots = EventStatsManager.instance()
1003+
.getTimestampedEventStatSnapshots(EnumSet.allOf(EventStatName.class));
1004+
1005+
assertEquals(0L, snapshots.get(EventStatName.TEXT_CHUNKING_PROCESSOR_EXECUTIONS).getValue().longValue());
1006+
assertEquals(0L, snapshots.get(EventStatName.TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS).getValue().longValue());
1007+
}
1008+
1009+
@SneakyThrows
1010+
public void testExecute_statsEnabled_withFixedTokenLength_andSourceDataString_thenSucceed() {
1011+
TextChunkingProcessor processor = createFixedTokenLengthInstance(createStringFieldMap());
1012+
IngestDocument ingestDocument = createIngestDocumentWithSourceData(createSourceDataString());
1013+
IngestDocument document = processor.execute(ingestDocument);
1014+
assert document.getSourceAndMetadata().containsKey(OUTPUT_FIELD);
1015+
Object passages = document.getSourceAndMetadata().get(OUTPUT_FIELD);
1016+
assert (passages instanceof List<?>);
1017+
List<String> expectedPassages = new ArrayList<>();
1018+
expectedPassages.add("This is an example document to be chunked. The document ");
1019+
expectedPassages.add("contains a single paragraph, two sentences and 24 tokens by ");
1020+
expectedPassages.add("standard tokenizer in OpenSearch.");
1021+
assertEquals(expectedPassages, passages);
1022+
1023+
Map<EventStatName, TimestampedEventStatSnapshot> snapshots = EventStatsManager.instance()
1024+
.getTimestampedEventStatSnapshots(EnumSet.allOf(EventStatName.class));
1025+
1026+
assertEquals(1L, snapshots.get(EventStatName.TEXT_CHUNKING_PROCESSOR_EXECUTIONS).getValue().longValue());
1027+
assertEquals(1L, snapshots.get(EventStatName.TEXT_CHUNKING_FIXED_LENGTH_EXECUTIONS).getValue().longValue());
1028+
}
1029+
1030+
@SneakyThrows
1031+
public void testExecute_statsEnabled_withDelimiter_andSourceDataString_thenSucceed() {
1032+
TextChunkingProcessor processor = createDelimiterInstance();
1033+
IngestDocument ingestDocument = createIngestDocumentWithSourceData(createSourceDataString());
1034+
IngestDocument document = processor.execute(ingestDocument);
1035+
assert document.getSourceAndMetadata().containsKey(OUTPUT_FIELD);
1036+
Object passages = document.getSourceAndMetadata().get(OUTPUT_FIELD);
1037+
assert (passages instanceof List<?>);
1038+
List<String> expectedPassages = new ArrayList<>();
1039+
expectedPassages.add("This is an example document to be chunked.");
1040+
expectedPassages.add(" The document contains a single paragraph, two sentences and 24 tokens by standard tokenizer in OpenSearch.");
1041+
assertEquals(expectedPassages, passages);
1042+
1043+
Map<EventStatName, TimestampedEventStatSnapshot> snapshots = EventStatsManager.instance()
1044+
.getTimestampedEventStatSnapshots(EnumSet.allOf(EventStatName.class));
1045+
1046+
assertEquals(1L, snapshots.get(EventStatName.TEXT_CHUNKING_PROCESSOR_EXECUTIONS).getValue().longValue());
1047+
assertEquals(1L, snapshots.get(EventStatName.TEXT_CHUNKING_DELIMITER_EXECUTIONS).getValue().longValue());
1048+
}
9751049
}

src/test/java/org/opensearch/neuralsearch/stats/events/EventStatNameTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ public void test_validNames() {
4545

4646
public void test_uniquePaths() {
4747
Set<String> paths = new HashSet<>();
48+
49+
// First pass to add all base paths (excluding stat names) to avoid colliding a stat name with a terminal path
50+
// e.g. if a.b is a stat, a.b.c cannot be a stat.
4851
for (EventStatName statName : EVENT_STATS) {
4952
String path = statName.getPath().toLowerCase(Locale.ROOT);
50-
assertFalse(String.format(Locale.ROOT, "Checking path uniqueness for %s", path), paths.contains(path));
5153
paths.add(path);
5254
}
5355

0 commit comments

Comments
 (0)