29
29
@ TestMethodOrder (MethodOrderer .OrderAnnotation .class )
30
30
@ Slf4j
31
31
public class ServerTest {
32
-
33
32
@ Test
34
33
@ Order (1 )
35
34
public void testMapServerInvocation () {
@@ -73,10 +72,8 @@ public void testFlatMapServerInvocation() {
73
72
}
74
73
75
74
MapperTestKit .Client client = new MapperTestKit .Client ();
76
- MapperTestKit .TestDatum datum = MapperTestKit .TestDatum
77
- .builder ()
78
- .value ("apple,banana,carrot" .getBytes ())
79
- .build ();
75
+ MapperTestKit .TestDatum datum =
76
+ MapperTestKit .TestDatum .builder ().value ("apple,banana,carrot" .getBytes ()).build ();
80
77
81
78
MessageList result = client .sendRequest (new String []{}, datum );
82
79
@@ -113,26 +110,27 @@ public void testReduceServerInvocation() {
113
110
// create 10 datum with values 1 to 10
114
111
List <Datum > datumList = new ArrayList <>();
115
112
for (int i = 1 ; i <= 10 ; i ++) {
116
- datumList .add (ReducerTestKit .TestDatum
117
- .builder ()
118
- .value (Integer .toString (i ).getBytes ())
119
- .build ());
113
+ datumList .add (
114
+ ReducerTestKit .TestDatum
115
+ .builder ()
116
+ .value (Integer .toString (i ).getBytes ())
117
+ .build ());
120
118
}
121
119
122
120
// create a client and send requests to the server
123
121
ReducerTestKit .Client client = new ReducerTestKit .Client ();
124
122
125
- ReducerTestKit .TestReduceRequest testReduceRequest = ReducerTestKit . TestReduceRequest
126
- .builder ()
127
- .datumList (datumList )
128
- .keys (new String []{"test-key" })
129
- .startTime (Instant .ofEpochSecond (60000 ))
130
- .endTime (Instant .ofEpochSecond (60010 ))
131
- .build ();
123
+ ReducerTestKit .TestReduceRequest testReduceRequest =
124
+ ReducerTestKit . TestReduceRequest .builder ()
125
+ .datumList (datumList )
126
+ .keys (new String []{"test-key" })
127
+ .startTime (Instant .ofEpochSecond (60000 ))
128
+ .endTime (Instant .ofEpochSecond (60010 ))
129
+ .build ();
132
130
133
131
try {
134
- io .numaproj .numaflow .reducer .MessageList messageList = client . sendReduceRequest (
135
- testReduceRequest );
132
+ io .numaproj .numaflow .reducer .MessageList messageList =
133
+ client . sendReduceRequest ( testReduceRequest );
136
134
// check if the response is correct
137
135
if (messageList .getMessages ().size () != 1 ) {
138
136
Assertions .fail ("Expected 1 message in the response" );
@@ -169,12 +167,12 @@ public void testSinkServerInvocation() {
169
167
// Create a test datum iterator with 10 messages
170
168
SinkerTestKit .TestListIterator testListIterator = new SinkerTestKit .TestListIterator ();
171
169
for (int i = 0 ; i < datumCount ; i ++) {
172
- testListIterator .addDatum (SinkerTestKit . TestDatum
173
- .builder ()
174
- .id ("id-" + i )
175
- .value (("value-" + i ).getBytes ())
176
- .headers (Map .of ("test-key" , "test-value" ))
177
- .build ());
170
+ testListIterator .addDatum (
171
+ SinkerTestKit . TestDatum .builder ()
172
+ .id ("id-" + i )
173
+ .value (("value-" + i ).getBytes ())
174
+ .headers (Map .of ("test-key" , "test-value" ))
175
+ .build ());
178
176
}
179
177
180
178
SinkerTestKit .Client client = new SinkerTestKit .Client ();
@@ -199,47 +197,51 @@ public void testSinkServerInvocation() {
199
197
// we can add the logic to verify if the messages were
200
198
// successfully written to the sink(could be a file, database, etc.)
201
199
}
202
- // FIXME: once tester kit changes are done for bidirectional streaming source
203
- // @Ignore
204
- // @Test
205
- // @Order(5)
206
- // public void testSourceServerInvocation() {
207
- // SimpleSource simpleSource = new SimpleSource();
208
- //
209
- // SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource);
210
- // try {
211
- // sourcerTestKit.startServer();
212
- // } catch (Exception e) {
213
- // Assertions.fail("Failed to start server");
214
- // }
215
- //
216
- // // create a client to send requests to the server
217
- // SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client();
218
- // // create a test observer to receive messages from the server
219
- // SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver();
220
- // // create a read request with count 10 and timeout 1 second
221
- // SourcerTestKit.TestReadRequest testReadRequest = SourcerTestKit.TestReadRequest.builder()
222
- // .count(10).timeout(Duration.ofSeconds(1)).build();
223
- //
224
- // try {
225
- // sourcerClient.sendReadRequest(testReadRequest, testObserver);
226
- // Assertions.assertEquals(10, testObserver.getMessages().size());
227
- // } catch (Exception e) {
228
- // Assertions.fail("Failed to send request to server");
229
- // }
230
- //
231
- // try {
232
- // sourcerClient.close();
233
- // sourcerTestKit.stopServer();
234
- // } catch (InterruptedException e) {
235
- // Assertions.fail("Failed to stop server");
236
- // }
237
- // }
200
+
201
+ // FIXME: once tester kit changes are done for bidirectional streaming source
202
+ // @Ignore
203
+ // @Test
204
+ // @Order(5)
205
+ // public void testSourceServerInvocation() {
206
+ // SimpleSource simpleSource = new SimpleSource();
207
+ //
208
+ // SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource);
209
+ // try {
210
+ // sourcerTestKit.startServer();
211
+ // } catch (Exception e) {
212
+ // Assertions.fail("Failed to start server");
213
+ // }
214
+ //
215
+ // // create a client to send requests to the server
216
+ // SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client();
217
+ // // create a test observer to receive messages from the server
218
+ // SourcerTestKit.TestListBasedObserver testObserver = new
219
+ // SourcerTestKit.TestListBasedObserver();
220
+ // // create a read request with count 10 and timeout 1 second
221
+ // SourcerTestKit.TestReadRequest testReadRequest =
222
+ // SourcerTestKit.TestReadRequest.builder()
223
+ // .count(10).timeout(Duration.ofSeconds(1)).build();
224
+ //
225
+ // try {
226
+ // sourcerClient.sendReadRequest(testReadRequest, testObserver);
227
+ // Assertions.assertEquals(10, testObserver.getMessages().size());
228
+ // } catch (Exception e) {
229
+ // Assertions.fail("Failed to send request to server");
230
+ // }
231
+ //
232
+ // try {
233
+ // sourcerClient.close();
234
+ // sourcerTestKit.stopServer();
235
+ // } catch (InterruptedException e) {
236
+ // Assertions.fail("Failed to stop server");
237
+ // }
238
+ // }
238
239
239
240
@ Test
240
241
@ Order (6 )
241
242
public void testSourceTransformerServerInvocation () {
242
- SourceTransformerTestKit sourceTransformerTestKit = new SourceTransformerTestKit (new EventTimeFilterFunction ());
243
+ SourceTransformerTestKit sourceTransformerTestKit =
244
+ new SourceTransformerTestKit (new EventTimeFilterFunction ());
243
245
try {
244
246
sourceTransformerTestKit .startServer ();
245
247
} catch (Exception e ) {
@@ -249,13 +251,13 @@ public void testSourceTransformerServerInvocation() {
249
251
// Create a client which can send requests to the server
250
252
SourceTransformerTestKit .Client client = new SourceTransformerTestKit .Client ();
251
253
252
- SourceTransformerTestKit .TestDatum datum = SourceTransformerTestKit . TestDatum . builder ()
253
- . eventTime ( Instant . ofEpochMilli ( 1640995200000L ) )
254
- . value ( "test" . getBytes ( ))
255
- . build ();
256
- io . numaproj . numaflow . sourcetransformer . MessageList result = client . sendRequest (
257
- new String []{},
258
- datum );
254
+ SourceTransformerTestKit .TestDatum datum =
255
+ SourceTransformerTestKit . TestDatum . builder ( )
256
+ . eventTime ( Instant . ofEpochMilli ( 1640995200000L ))
257
+ . value ( "test" . getBytes ())
258
+ . build ();
259
+ io . numaproj . numaflow . sourcetransformer . MessageList result =
260
+ client . sendRequest ( new String []{}, datum );
259
261
260
262
List <io .numaproj .numaflow .sourcetransformer .Message > messages = result .getMessages ();
261
263
Assertions .assertEquals (1 , messages .size ());
0 commit comments