Skip to content

Commit 93669b7

Browse files
committed
fix(common) remove slf4j-log4j12
1 parent 0b8cc34 commit 93669b7

File tree

4 files changed

+20
-54
lines changed

4 files changed

+20
-54
lines changed

rocketmq-streams-commons/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@
3030
<name>ROCKETMQ STREAMS :: commons</name>
3131
<packaging>jar</packaging>
3232
<dependencies>
33-
<dependency>
34-
<groupId>org.slf4j</groupId>
35-
<artifactId>slf4j-log4j12</artifactId>
36-
</dependency>
3733
<!-- http -->
3834
<dependency>
3935
<groupId>org.apache.commons</groupId>

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/ShuffleMessageCache.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,13 @@ protected boolean isOpenMiniBatch() {
135135
return isOpenMiniBatch;
136136
}
137137

138-
protected transient AtomicLong SUM=new AtomicLong(0);
138+
139139
protected JSONObject createMsg(String shuffleKey,WindowValue windowValue, MessageHeader messageHeader,JSONObject msgHeader) {
140140

141141
JSONObject message = new JSONObject();
142-
long start=System.currentTimeMillis();
143142
message.put(WindowValue.class.getName(), windowValue);
144-
// long sum=SUM.addAndGet(System.currentTimeMillis()-start);
145-
// System.out.println("create msg "+sum);
146143
message.put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY,AggregationScript.INNER_AGGREGATION_COMPUTE_MULTI);
144+
147145
IMessage windowValueMsg=new Message(message);
148146
windowValueMsg.setHeader(messageHeader);
149147
ShuffleUtil.createShuffleMsg(windowValueMsg,shuffleKey,msgHeader);

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.concurrent.Future;
26+
import java.util.concurrent.TimeUnit;
27+
2628
import org.apache.commons.lang3.tuple.Pair;
2729
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
2830
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -97,12 +99,7 @@ private void stateMustLoad(String queueId) {
9799
}
98100

99101
try {
100-
long before = System.currentTimeMillis();
101-
future.get();
102-
long after = System.currentTimeMillis();
103-
104-
System.out.println("message wait before state recover:[" + (after - before) + "] ms, queueId=" + queueId);
105-
102+
future.get(5, TimeUnit.SECONDS);
106103
for (String loadQueueId : loadResult.keySet()) {
107104
hasLoad.put(loadQueueId, true);
108105
}
@@ -111,15 +108,7 @@ private void stateMustLoad(String queueId) {
111108
}
112109
}
113110

114-
/**
115-
* save consumer progress(offset)for groupby source shuffleId
116-
* window configName: name_window_10001
117-
* shuffleId: shuffle_NormalTestTopic_namespace_name_broker-a_001
118-
* oriQueueId: NormalTestTopic2_broker-a_000
119-
*
120-
* @param shuffleId
121-
* @param messages
122-
*/
111+
123112
protected void saveSplitProgress(String shuffleId, List<IMessage> messages) {
124113
IStorage delegator = this.window.getStorage();
125114

@@ -144,13 +133,7 @@ protected void saveSplitProgress(String shuffleId, List<IMessage> messages) {
144133
}
145134

146135

147-
/**
148-
* 根据message,把message分组到不同的group,分别处理
149-
*
150-
* @param messageList
151-
* @param instance2Messages
152-
* @param windowInstanceMap
153-
*/
136+
154137
protected void groupByWindowInstanceAndQueueId(List<IMessage> messageList,
155138
Map<Pair<String, String>, List<IMessage>> instance2Messages, Map<String, WindowInstance> windowInstanceMap) {
156139
for (IMessage message : messageList) {

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -268,24 +268,15 @@ public boolean calculate(AbstractWindow window, IMessage message) {
268268
} catch (Exception e) {
269269
throw new RuntimeException("failed in window value calculating",e);
270270
}
271-
272-
//there is no need writing back to message
273-
274271
return true;
275272
}
276273

277-
protected static AtomicInteger SUM=new AtomicInteger(0);
278-
279274
protected void calFunctionColumn(AbstractWindow window, IMessage message) {
280275
String introduction = (String)message.getMessageBody().getOrDefault(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY, "");
281276
boolean isMultiAccumulate = AggregationScript.INNER_AGGREGATION_COMPUTE_MULTI.equals(introduction);
282277
if(isMultiAccumulate){
283278
WindowValue windowValue=message.getMessageBody().getObject(WindowValue.class.getName(),WindowValue.class);
284-
try {
285-
// windowValue= SerializeUtil.deserialize(windowValueJson,WindowValue.class);
286-
}catch (Exception e){
287-
throw new RuntimeException("window value deserializeObject error",e);
288-
}
279+
289280
List<WindowValue> windowValues=new ArrayList<>();
290281
windowValues.add(this);
291282
windowValues.add(windowValue);
@@ -304,6 +295,7 @@ protected void calFunctionColumn(AbstractWindow window, IMessage message) {
304295
AggregationScript originAccScript = (AggregationScript) executor;
305296
AggregationScript windowAccScript = originAccScript.clone();
306297
Object accumulator = null;
298+
307299
if (aggColumnResult.containsKey(executorName)) {
308300
accumulator = aggColumnResult.get(executorName);
309301
} else {
@@ -312,19 +304,18 @@ protected void calFunctionColumn(AbstractWindow window, IMessage message) {
312304
accumulator = director.createAccumulator();
313305
aggColumnResult.put(executorName, accumulator);
314306
}
307+
315308
windowAccScript.setAccumulator(accumulator);
316-
if(!isMultiAccumulate){
317-
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY,
318-
AggregationScript.INNER_AGGREGATION_COMPUTE_SINGLE);
319-
}
309+
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY, AggregationScript.INNER_AGGREGATION_COMPUTE_SINGLE);
310+
320311
FunctionContext context = new FunctionContext(message);
321312
windowAccScript.doMessage(message, context);
322313
} else if (executor instanceof FunctionScript) {
323314
FunctionContext context = new FunctionContext(message);
324315
((FunctionScript) executor).doMessage(message, context);
325316
}
326317
}
327-
//
318+
328319
computedColumnResult.put(computedColumn, message.getMessageBody().get(computedColumn));
329320
}
330321
calProjectColumn(window, message);
@@ -346,9 +337,6 @@ protected void calProjectColumn(AbstractWindow window, IMessage message) {
346337
}
347338

348339

349-
/**
350-
* merge the group which has the same group by value and different split id
351-
*/
352340
public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowValue> valueList) {
353341
WindowValue lastWindowValue = new WindowValue(valueList.get(0));
354342
lastWindowValue.setComputedColumnResult(valueList.get(0).getComputedColumnResult());
@@ -363,17 +351,17 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
363351
for (FunctionExecutor info : executorList) {
364352
String column = info.getColumn();
365353
IStreamOperator<IMessage, List<IMessage>> engine = info.getExecutor();
354+
366355
if (engine instanceof AggregationScript) {
367356
AggregationScript origin = (AggregationScript) engine;
368357
AggregationScript operator = origin.clone();
358+
369359
if (needMergeComputation) {
370-
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY,
371-
AggregationScript.INNER_AGGREGATION_COMPUTE_SINGLE);
360+
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY, AggregationScript.INNER_AGGREGATION_COMPUTE_SINGLE);
372361
operator.setAccumulator(operator.getDirector().createAccumulator());
373362
operator.doMessage(message, context);
374363
} else {
375-
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY,
376-
AggregationScript.INNER_AGGREGATION_COMPUTE_MULTI);
364+
message.getMessageBody().put(AggregationScript.INNER_AGGREGATION_COMPUTE_KEY, AggregationScript.INNER_AGGREGATION_COMPUTE_MULTI);
377365
List actors = valueList.stream().map(
378366
windowValue -> {
379367
Object accumulator = null;
@@ -394,6 +382,7 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
394382
operator.doMessage(message, context);
395383
needMergeComputation = true;
396384
}
385+
397386
} else if (engine instanceof FunctionScript) {
398387
FunctionScript theScript = (FunctionScript) engine;
399388
String[] parameters = theScript.getDependentParameters();
@@ -409,11 +398,11 @@ public static WindowValue mergeWindowValue(AbstractWindow window, List<WindowVal
409398
}
410399
}
411400
}
401+
412402
if (message.getMessageBody().containsKey(computedColumn)) {
413403
lastWindowValue.computedColumnResult.put(computedColumn, message.getMessageBody().get(computedColumn));
414404
} else if (!needMergeComputation) {
415-
lastWindowValue.computedColumnResult.put(computedColumn,
416-
valueList.get(0).computedColumnResult.get(computedColumn));
405+
lastWindowValue.computedColumnResult.put(computedColumn, valueList.get(0).computedColumnResult.get(computedColumn));
417406
}
418407
}
419408
// valueList.stream().map(value -> lastWindowValue.count += value.getCount());

0 commit comments

Comments
 (0)