Skip to content

Commit 791ab01

Browse files
Fixed bug, added unit test
Signed-off-by: Martin Gaievski <[email protected]>
1 parent 398d60d commit 791ab01

File tree

2 files changed

+285
-10
lines changed

2 files changed

+285
-10
lines changed

server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ protected void onRequestProcessorFailed(Processor processor) {
216216
}
217217

218218
protected void beforeTransformResponse() {
219-
super.beforeTransformRequest();
219+
super.beforeTransformResponse();
220220
totalResponseMetrics.before();
221221
pipelineResponseMetrics.before();
222222
}
@@ -247,23 +247,20 @@ protected void onResponseProcessorFailed(Processor processor) {
247247

248248
@Override
249249
protected void beforeTransformPhaseResults() {
250-
super.beforeTransformRequest();
251-
totalRequestMetrics.before();
252-
pipelineRequestMetrics.before();
250+
super.beforeTransformPhaseResults();
251+
totalPhaseResultsMetrics.before();
253252
}
254253

255254
@Override
256255
protected void afterTransformPhaseResults(long timeInNanos) {
257-
super.afterTransformRequest(timeInNanos);
258-
totalRequestMetrics.after(timeInNanos);
259-
pipelineRequestMetrics.after(timeInNanos);
256+
super.afterTransformPhaseResults(timeInNanos);
257+
totalPhaseResultsMetrics.after(timeInNanos);
260258
}
261259

262260
@Override
263261
protected void onTransformPhaseResultsFailure() {
264-
super.onTransformRequestFailure();
265-
totalRequestMetrics.failed();
266-
pipelineRequestMetrics.failed();
262+
super.onTransformPhaseResultsFailure();
263+
totalPhaseResultsMetrics.failed();
267264
}
268265

269266
protected void beforePhaseResultsProcessor(Processor processor) {
@@ -283,6 +280,7 @@ void copyMetrics(PipelineWithMetrics oldPipeline) {
283280
pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics);
284281
copyProcessorMetrics(requestProcessorMetrics, oldPipeline.requestProcessorMetrics);
285282
copyProcessorMetrics(responseProcessorMetrics, oldPipeline.responseProcessorMetrics);
283+
copyProcessorMetrics(phaseResultsProcessorMetrics, oldPipeline.phaseResultsProcessorMetrics);
286284
}
287285

288286
private static <T extends Processor> void copyProcessorMetrics(
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.pipeline;
10+
11+
import org.opensearch.action.search.SearchPhaseContext;
12+
import org.opensearch.action.search.SearchPhaseName;
13+
import org.opensearch.action.search.SearchPhaseResults;
14+
import org.opensearch.action.search.SearchRequest;
15+
import org.opensearch.action.search.SearchResponse;
16+
import org.opensearch.common.metrics.OperationMetrics;
17+
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
19+
import org.opensearch.search.SearchPhaseResult;
20+
import org.opensearch.test.OpenSearchTestCase;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
import org.mockito.Mock;
29+
import org.mockito.MockitoAnnotations;
30+
31+
import static org.mockito.ArgumentMatchers.any;
32+
import static org.mockito.Mockito.doAnswer;
33+
import static org.mockito.Mockito.when;
34+
35+
public class PipelineWithMetricsTests extends OpenSearchTestCase {
36+
37+
@Mock
38+
private SearchRequestProcessor mockRequestProcessor;
39+
@Mock
40+
private SearchResponseProcessor mockResponseProcessor;
41+
@Mock
42+
private SearchPhaseResultsProcessor mockPhaseResultsProcessor;
43+
@Mock
44+
private SearchRequest mockSearchRequest;
45+
@Mock
46+
private SearchResponse mockSearchResponse;
47+
@Mock
48+
private SearchPhaseResults<SearchPhaseResult> mockPhaseResults;
49+
@Mock
50+
private SearchPhaseContext mockPhaseContext;
51+
@Mock
52+
private NamedWriteableRegistry mockNamedWriteableRegistry;
53+
54+
@Override
55+
public void setUp() throws Exception {
56+
super.setUp();
57+
MockitoAnnotations.openMocks(this);
58+
}
59+
60+
public void testRequestProcessorMetricsCollection() throws Exception {
61+
// Setup
62+
when(mockRequestProcessor.getType()).thenReturn("test-request-processor");
63+
when(mockRequestProcessor.getTag()).thenReturn("tag1");
64+
when(mockRequestProcessor.isIgnoreFailure()).thenReturn(false);
65+
66+
doAnswer(invocation -> {
67+
ActionListener<SearchRequest> listener = invocation.getArgument(2);
68+
// Simulate some processing time
69+
Thread.sleep(10);
70+
listener.onResponse(mockSearchRequest);
71+
return null;
72+
}).when(mockRequestProcessor).processRequestAsync(any(), any(), any());
73+
74+
OperationMetrics totalRequestMetrics = new OperationMetrics();
75+
OperationMetrics totalResponseMetrics = new OperationMetrics();
76+
OperationMetrics totalPhaseMetrics = new OperationMetrics();
77+
78+
PipelineWithMetrics pipeline = new PipelineWithMetrics(
79+
"test-pipeline",
80+
"Test pipeline",
81+
1,
82+
List.of(mockRequestProcessor),
83+
Collections.emptyList(),
84+
Collections.emptyList(),
85+
mockNamedWriteableRegistry,
86+
totalRequestMetrics,
87+
totalResponseMetrics,
88+
totalPhaseMetrics,
89+
System::nanoTime
90+
);
91+
92+
// Execute
93+
CountDownLatch latch = new CountDownLatch(1);
94+
AtomicReference<SearchRequest> result = new AtomicReference<>();
95+
AtomicReference<Exception> error = new AtomicReference<>();
96+
97+
pipeline.transformRequest(mockSearchRequest, ActionListener.wrap(
98+
request -> {
99+
result.set(request);
100+
latch.countDown();
101+
},
102+
exception -> {
103+
error.set(exception);
104+
latch.countDown();
105+
}
106+
), new PipelineProcessingContext());
107+
108+
assertTrue("Request processing should complete within timeout", latch.await(5, TimeUnit.SECONDS));
109+
assertNull("No error should occur", error.get());
110+
assertNotNull("Result should not be null", result.get());
111+
112+
// Verify metrics were collected correctly
113+
assertTrue("Total request metrics should have at least one execution", totalRequestMetrics.createStats().getCount() > 0);
114+
assertTrue("Total request metrics should have recorded time", totalRequestMetrics.createStats().getTotalTimeInMillis() >= 0);
115+
116+
// Verify stats builder works correctly
117+
SearchPipelineStats.Builder statsBuilder = new SearchPipelineStats.Builder();
118+
statsBuilder.withTotalStats(totalRequestMetrics, totalResponseMetrics, totalPhaseMetrics);
119+
pipeline.populateStats(statsBuilder);
120+
SearchPipelineStats stats = statsBuilder.build();
121+
122+
assertNotNull("Stats should not be null", stats);
123+
assertEquals("Should have one pipeline", 1, stats.getPipelineStats().size());
124+
125+
// Verify processor stats are included
126+
String expectedProcessorKey = "test-request-processor:tag1";
127+
assertTrue("Should contain processor stats",
128+
stats.getPerPipelineProcessorStats().get("test-pipeline")
129+
.requestProcessorStats().stream()
130+
.anyMatch(ps -> ps.getProcessorName().equals(expectedProcessorKey))
131+
);
132+
}
133+
134+
public void testPhaseResultsProcessorMetricsCollection() throws Exception {
135+
// Setup
136+
when(mockPhaseResultsProcessor.getType()).thenReturn("test-phase-processor");
137+
when(mockPhaseResultsProcessor.getTag()).thenReturn("phase-tag");
138+
when(mockPhaseResultsProcessor.isIgnoreFailure()).thenReturn(false);
139+
when(mockPhaseResultsProcessor.getBeforePhase()).thenReturn(SearchPhaseName.QUERY);
140+
when(mockPhaseResultsProcessor.getAfterPhase()).thenReturn(SearchPhaseName.FETCH);
141+
142+
doAnswer(invocation -> {
143+
// Simulate some processing time
144+
Thread.sleep(10);
145+
return null;
146+
}).when(mockPhaseResultsProcessor).process(any(), any(), any());
147+
148+
OperationMetrics totalRequestMetrics = new OperationMetrics();
149+
OperationMetrics totalResponseMetrics = new OperationMetrics();
150+
OperationMetrics totalPhaseMetrics = new OperationMetrics();
151+
152+
PipelineWithMetrics pipeline = new PipelineWithMetrics(
153+
"test-pipeline",
154+
"Test pipeline",
155+
1,
156+
Collections.emptyList(),
157+
Collections.emptyList(),
158+
List.of(mockPhaseResultsProcessor),
159+
mockNamedWriteableRegistry,
160+
totalRequestMetrics,
161+
totalResponseMetrics,
162+
totalPhaseMetrics,
163+
System::nanoTime
164+
);
165+
166+
// Execute
167+
pipeline.runSearchPhaseResultsTransformer(
168+
mockPhaseResults,
169+
mockPhaseContext,
170+
"query",
171+
"fetch",
172+
new PipelineProcessingContext()
173+
);
174+
175+
// Verify phase results metrics were collected correctly
176+
assertTrue("Total phase metrics should have at least one execution", totalPhaseMetrics.createStats().getCount() > 0);
177+
assertTrue("Total phase metrics should have recorded time", totalPhaseMetrics.createStats().getTotalTimeInMillis() >= 0);
178+
179+
// Verify stats builder works correctly
180+
SearchPipelineStats.Builder statsBuilder = new SearchPipelineStats.Builder();
181+
statsBuilder.withTotalStats(totalRequestMetrics, totalResponseMetrics, totalPhaseMetrics);
182+
pipeline.populateStats(statsBuilder);
183+
SearchPipelineStats stats = statsBuilder.build();
184+
185+
// Verify processor stats are included
186+
String expectedProcessorKey = "test-phase-processor:phase-tag";
187+
assertTrue("Should contain phase processor stats",
188+
stats.getPerPipelineProcessorStats().get("test-pipeline")
189+
.phaseResultsProcessorStats().stream()
190+
.anyMatch(ps -> ps.getProcessorName().equals(expectedProcessorKey))
191+
);
192+
}
193+
194+
public void testCopyMetricsIncludesPhaseProcessors() {
195+
// Setup old pipeline with some metrics
196+
OperationMetrics oldTotalRequest = new OperationMetrics();
197+
OperationMetrics oldTotalResponse = new OperationMetrics();
198+
OperationMetrics oldTotalPhase = new OperationMetrics();
199+
200+
when(mockPhaseResultsProcessor.getType()).thenReturn("test-phase-processor");
201+
when(mockPhaseResultsProcessor.getTag()).thenReturn("phase-tag");
202+
203+
PipelineWithMetrics oldPipeline = new PipelineWithMetrics(
204+
"old-pipeline",
205+
"Old pipeline",
206+
1,
207+
Collections.emptyList(),
208+
Collections.emptyList(),
209+
List.of(mockPhaseResultsProcessor),
210+
mockNamedWriteableRegistry,
211+
oldTotalRequest,
212+
oldTotalResponse,
213+
oldTotalPhase,
214+
System::nanoTime
215+
);
216+
217+
// Simulate some executions on old pipeline to generate metrics
218+
oldTotalRequest.before();
219+
oldTotalRequest.after(1000000L); // 1ms in nanos
220+
oldTotalResponse.before();
221+
oldTotalResponse.after(2000000L); // 2ms in nanos
222+
oldTotalPhase.before();
223+
oldTotalPhase.after(3000000L); // 3ms in nanos
224+
225+
// Create new pipeline
226+
OperationMetrics newTotalRequest = new OperationMetrics();
227+
OperationMetrics newTotalResponse = new OperationMetrics();
228+
OperationMetrics newTotalPhase = new OperationMetrics();
229+
230+
PipelineWithMetrics newPipeline = new PipelineWithMetrics(
231+
"new-pipeline",
232+
"New pipeline",
233+
1,
234+
Collections.emptyList(),
235+
Collections.emptyList(),
236+
List.of(mockPhaseResultsProcessor),
237+
mockNamedWriteableRegistry,
238+
newTotalRequest,
239+
newTotalResponse,
240+
newTotalPhase,
241+
System::nanoTime
242+
);
243+
244+
// Copy metrics
245+
newPipeline.copyMetrics(oldPipeline);
246+
247+
// Verify that metrics were copied (this test would fail before our fix)
248+
SearchPipelineStats.Builder statsBuilder = new SearchPipelineStats.Builder();
249+
statsBuilder.withTotalStats(newTotalRequest, newTotalResponse, newTotalPhase);
250+
newPipeline.populateStats(statsBuilder);
251+
SearchPipelineStats stats = statsBuilder.build();
252+
253+
// The phase processor metrics should be preserved during copy
254+
assertFalse(
255+
"Phase processor stats should exist after copy",
256+
stats.getPerPipelineProcessorStats().get("new-pipeline").phaseResultsProcessorStats().isEmpty()
257+
);
258+
}
259+
260+
/**
261+
* Mock class for SearchPipelineExecutionPhase since it may not be available in test classpath
262+
*/
263+
private enum SearchPipelineExecutionPhase {
264+
QUERY("query"),
265+
FETCH("fetch");
266+
267+
private final String name;
268+
269+
SearchPipelineExecutionPhase(String name) {
270+
this.name = name;
271+
}
272+
273+
public String getName() {
274+
return name;
275+
}
276+
}
277+
}

0 commit comments

Comments
 (0)