@@ -268,24 +268,15 @@ public boolean calculate(AbstractWindow window, IMessage message) {
268
268
} catch (Exception e ) {
269
269
throw new RuntimeException ("failed in window value calculating" ,e );
270
270
}
271
-
272
- //there is no need writing back to message
273
-
274
271
return true ;
275
272
}
276
273
277
- protected static AtomicInteger SUM =new AtomicInteger (0 );
278
-
279
274
protected void calFunctionColumn (AbstractWindow window , IMessage message ) {
280
275
String introduction = (String )message .getMessageBody ().getOrDefault (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY , "" );
281
276
boolean isMultiAccumulate = AggregationScript .INNER_AGGREGATION_COMPUTE_MULTI .equals (introduction );
282
277
if (isMultiAccumulate ){
283
278
WindowValue windowValue =message .getMessageBody ().getObject (WindowValue .class .getName (),WindowValue .class );
284
- try {
285
- // windowValue= SerializeUtil.deserialize(windowValueJson,WindowValue.class);
286
- }catch (Exception e ){
287
- throw new RuntimeException ("window value deserializeObject error" ,e );
288
- }
279
+
289
280
List <WindowValue > windowValues =new ArrayList <>();
290
281
windowValues .add (this );
291
282
windowValues .add (windowValue );
@@ -304,6 +295,7 @@ protected void calFunctionColumn(AbstractWindow window, IMessage message) {
304
295
AggregationScript originAccScript = (AggregationScript ) executor ;
305
296
AggregationScript windowAccScript = originAccScript .clone ();
306
297
Object accumulator = null ;
298
+
307
299
if (aggColumnResult .containsKey (executorName )) {
308
300
accumulator = aggColumnResult .get (executorName );
309
301
} else {
@@ -312,19 +304,18 @@ protected void calFunctionColumn(AbstractWindow window, IMessage message) {
312
304
accumulator = director .createAccumulator ();
313
305
aggColumnResult .put (executorName , accumulator );
314
306
}
307
+
315
308
windowAccScript .setAccumulator (accumulator );
316
- if (!isMultiAccumulate ){
317
- message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY ,
318
- AggregationScript .INNER_AGGREGATION_COMPUTE_SINGLE );
319
- }
309
+ message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY , AggregationScript .INNER_AGGREGATION_COMPUTE_SINGLE );
310
+
320
311
FunctionContext context = new FunctionContext (message );
321
312
windowAccScript .doMessage (message , context );
322
313
} else if (executor instanceof FunctionScript ) {
323
314
FunctionContext context = new FunctionContext (message );
324
315
((FunctionScript ) executor ).doMessage (message , context );
325
316
}
326
317
}
327
- //
318
+
328
319
computedColumnResult .put (computedColumn , message .getMessageBody ().get (computedColumn ));
329
320
}
330
321
calProjectColumn (window , message );
@@ -346,9 +337,6 @@ protected void calProjectColumn(AbstractWindow window, IMessage message) {
346
337
}
347
338
348
339
349
- /**
350
- * merge the group which has the same group by value and different split id
351
- */
352
340
public static WindowValue mergeWindowValue (AbstractWindow window , List <WindowValue > valueList ) {
353
341
WindowValue lastWindowValue = new WindowValue (valueList .get (0 ));
354
342
lastWindowValue .setComputedColumnResult (valueList .get (0 ).getComputedColumnResult ());
@@ -363,17 +351,17 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
363
351
for (FunctionExecutor info : executorList ) {
364
352
String column = info .getColumn ();
365
353
IStreamOperator <IMessage , List <IMessage >> engine = info .getExecutor ();
354
+
366
355
if (engine instanceof AggregationScript ) {
367
356
AggregationScript origin = (AggregationScript ) engine ;
368
357
AggregationScript operator = origin .clone ();
358
+
369
359
if (needMergeComputation ) {
370
- message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY ,
371
- AggregationScript .INNER_AGGREGATION_COMPUTE_SINGLE );
360
+ message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY , AggregationScript .INNER_AGGREGATION_COMPUTE_SINGLE );
372
361
operator .setAccumulator (operator .getDirector ().createAccumulator ());
373
362
operator .doMessage (message , context );
374
363
} else {
375
- message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY ,
376
- AggregationScript .INNER_AGGREGATION_COMPUTE_MULTI );
364
+ message .getMessageBody ().put (AggregationScript .INNER_AGGREGATION_COMPUTE_KEY , AggregationScript .INNER_AGGREGATION_COMPUTE_MULTI );
377
365
List actors = valueList .stream ().map (
378
366
windowValue -> {
379
367
Object accumulator = null ;
@@ -394,6 +382,7 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
394
382
operator .doMessage (message , context );
395
383
needMergeComputation = true ;
396
384
}
385
+
397
386
} else if (engine instanceof FunctionScript ) {
398
387
FunctionScript theScript = (FunctionScript ) engine ;
399
388
String [] parameters = theScript .getDependentParameters ();
@@ -409,11 +398,11 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
409
398
}
410
399
}
411
400
}
401
+
412
402
if (message .getMessageBody ().containsKey (computedColumn )) {
413
403
lastWindowValue .computedColumnResult .put (computedColumn , message .getMessageBody ().get (computedColumn ));
414
404
} else if (!needMergeComputation ) {
415
- lastWindowValue .computedColumnResult .put (computedColumn ,
416
- valueList .get (0 ).computedColumnResult .get (computedColumn ));
405
+ lastWindowValue .computedColumnResult .put (computedColumn , valueList .get (0 ).computedColumnResult .get (computedColumn ));
417
406
}
418
407
}
419
408
// valueList.stream().map(value -> lastWindowValue.count += value.getCount());
0 commit comments