|
4 | 4 | */
|
5 | 5 | package org.opensearch.neuralsearch.stats.info;
|
6 | 6 |
|
| 7 | +import org.opensearch.neuralsearch.processor.NormalizationProcessor; |
| 8 | +import org.opensearch.neuralsearch.processor.RRFProcessor; |
7 | 9 | import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
|
8 | 10 | import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor;
|
| 11 | +import org.opensearch.neuralsearch.processor.combination.ArithmeticMeanScoreCombinationTechnique; |
| 12 | +import org.opensearch.neuralsearch.processor.combination.GeometricMeanScoreCombinationTechnique; |
| 13 | +import org.opensearch.neuralsearch.processor.combination.HarmonicMeanScoreCombinationTechnique; |
| 14 | +import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory; |
| 15 | +import org.opensearch.neuralsearch.processor.normalization.L2ScoreNormalizationTechnique; |
| 16 | +import org.opensearch.neuralsearch.processor.normalization.MinMaxScoreNormalizationTechnique; |
| 17 | +import org.opensearch.neuralsearch.processor.normalization.ZScoreNormalizationTechnique; |
9 | 18 | import org.opensearch.neuralsearch.processor.chunker.DelimiterChunker;
|
10 | 19 | import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
|
11 | 20 | import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
|
|
24 | 33 | */
|
25 | 34 | public class InfoStatsManager {
|
26 | 35 | public static final String PROCESSORS_KEY = "processors";
|
| 36 | + public static final String PHASE_RESULTS_PROCESSORS_KEY = "phase_results_processors"; |
27 | 37 |
|
28 | 38 | private final NeuralSearchClusterUtil neuralSearchClusterUtil;
|
29 | 39 | private final NeuralSearchSettingsAccessor settingsAccessor;
|
@@ -83,6 +93,9 @@ private Map<InfoStatName, CountableInfoStatSnapshot> getCountableStats() {
|
83 | 93 | // Parses ingest pipeline processor configs for processor info
|
84 | 94 | addIngestProcessorStats(countableInfoStats);
|
85 | 95 |
|
| 96 | + // Parses search pipeline processor configs for processor info |
| 97 | + addSearchProcessorStats(countableInfoStats); |
| 98 | + |
86 | 99 | // Helpers to parse search pipeline processor configs for processor info would go here
|
87 | 100 | return countableInfoStats;
|
88 | 101 | }
|
@@ -162,6 +175,81 @@ private void countTextChunkingProcessorStats(Map<InfoStatName, CountableInfoStat
|
162 | 175 | }
|
163 | 176 | }
|
164 | 177 |
|
| 178 | + /** |
| 179 | + * Adds search processor info stats, mutating the input |
| 180 | + * @param stats mutable map of info stats that the result will be added to |
| 181 | + */ |
| 182 | + private void addSearchProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats) { |
| 183 | + List<Map<String, Object>> pipelineConfigs = pipelineServiceUtil.getSearchPipelineConfigs(); |
| 184 | + |
| 185 | + // Iterate through all search processors and count their stats individually by calling helpers |
| 186 | + for (Map<String, Object> pipelineConfig : pipelineConfigs) { |
| 187 | + // Search phase results processors |
| 188 | + List<Map<String, Object>> phaseResultsProcessors = asListOfMaps(pipelineConfig.get(PHASE_RESULTS_PROCESSORS_KEY)); |
| 189 | + for (Map<String, Object> phaseResultsProcessor : phaseResultsProcessors) { |
| 190 | + for (Map.Entry<String, Object> entry : phaseResultsProcessor.entrySet()) { |
| 191 | + String processorType = entry.getKey(); |
| 192 | + Map<String, Object> processorConfig = asMap(entry.getValue()); |
| 193 | + switch (processorType) { |
| 194 | + case NormalizationProcessor.TYPE: |
| 195 | + countNormalizationProcessorStats(stats, processorConfig); |
| 196 | + break; |
| 197 | + case RRFProcessor.TYPE: |
| 198 | + countRRFProcessorStats(stats, processorConfig); |
| 199 | + break; |
| 200 | + } |
| 201 | + } |
| 202 | + } |
| 203 | + } |
| 204 | + } |
| 205 | + |
| 206 | + private void countNormalizationProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, Map<String, Object> processorConfig) { |
| 207 | + increment(stats, InfoStatName.NORMALIZATION_PROCESSORS); |
| 208 | + |
| 209 | + String normalizationTechnique = asString( |
| 210 | + asMap(processorConfig.get(NormalizationProcessorFactory.NORMALIZATION_CLAUSE)).get(NormalizationProcessorFactory.TECHNIQUE) |
| 211 | + ); |
| 212 | + String combinationTechnique = asString( |
| 213 | + asMap(processorConfig.get(NormalizationProcessorFactory.COMBINATION_CLAUSE)).get(NormalizationProcessorFactory.TECHNIQUE) |
| 214 | + ); |
| 215 | + |
| 216 | + countNormalizationTechniqueStats(stats, normalizationTechnique); |
| 217 | + countCombinationTechniqueStats(stats, combinationTechnique); |
| 218 | + } |
| 219 | + |
| 220 | + private void countRRFProcessorStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, Map<String, Object> processorConfig) { |
| 221 | + increment(stats, InfoStatName.RRF_PROCESSORS); |
| 222 | + |
| 223 | + // RRF only has combination technique |
| 224 | + String combinationTechnique = asString( |
| 225 | + asMap(processorConfig.get(NormalizationProcessorFactory.COMBINATION_CLAUSE)).get(NormalizationProcessorFactory.TECHNIQUE) |
| 226 | + ); |
| 227 | + |
| 228 | + countCombinationTechniqueStats(stats, combinationTechnique); |
| 229 | + } |
| 230 | + |
| 231 | + private void countNormalizationTechniqueStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, String normalizationTechnique) { |
| 232 | + switch (normalizationTechnique) { |
| 233 | + case L2ScoreNormalizationTechnique.TECHNIQUE_NAME -> increment(stats, InfoStatName.NORM_TECHNIQUE_L2_PROCESSORS); |
| 234 | + case MinMaxScoreNormalizationTechnique.TECHNIQUE_NAME -> increment(stats, InfoStatName.NORM_TECHNIQUE_MINMAX_PROCESSORS); |
| 235 | + case ZScoreNormalizationTechnique.TECHNIQUE_NAME -> increment(stats, InfoStatName.NORM_TECHNIQUE_ZSCORE_PROCESSORS); |
| 236 | + } |
| 237 | + } |
| 238 | + |
| 239 | + private void countCombinationTechniqueStats(Map<InfoStatName, CountableInfoStatSnapshot> stats, String combinationTechnique) { |
| 240 | + switch (combinationTechnique) { |
| 241 | + case ArithmeticMeanScoreCombinationTechnique.TECHNIQUE_NAME -> increment( |
| 242 | + stats, |
| 243 | + InfoStatName.COMB_TECHNIQUE_ARITHMETIC_PROCESSORS |
| 244 | + ); |
| 245 | + case GeometricMeanScoreCombinationTechnique.TECHNIQUE_NAME -> increment( |
| 246 | + stats, |
| 247 | + InfoStatName.COMB_TECHNIQUE_GEOMETRIC_PROCESSORS |
| 248 | + ); |
| 249 | + case HarmonicMeanScoreCombinationTechnique.TECHNIQUE_NAME -> increment(stats, InfoStatName.COMB_TECHNIQUE_HARMONIC_PROCESSORS); |
| 250 | + } |
| 251 | + } |
| 252 | + |
165 | 253 | /**
|
166 | 254 | * Increments a countable info stat in the given stat name
|
167 | 255 | * @param stats map containing the stat to increment
|
@@ -203,6 +291,17 @@ private Map<String, Object> asMap(Object value) {
|
203 | 291 | return value instanceof Map ? (Map<String, Object>) value : null;
|
204 | 292 | }
|
205 | 293 |
|
| 294 | + /** |
| 295 | + * Helper to cast generic object into String or null |
| 296 | + * Used to parse pipeline processor configs |
| 297 | + * @param value the object |
| 298 | + * @return the string or null if not a string |
| 299 | + */ |
| 300 | + @SuppressWarnings("unchecked") |
| 301 | + private String asString(Object value) { |
| 302 | + return value instanceof String ? (String) value : null; |
| 303 | + } |
| 304 | + |
206 | 305 | /**
|
207 | 306 | * Helper to cast generic object into a list of Map<String, Object>
|
208 | 307 | * Used to parse pipeline processor configs
|
|
0 commit comments