Skip to content

Commit 59e3db0

Browse files
committed
Merge branch 'develop' into develop_avg
2 parents 040a9df + 608ccae commit 59e3db0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1463
-423
lines changed

core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.rocketmq.streams.core.common;
2020

21+
import java.nio.charset.StandardCharsets;
22+
2123
public class Constant {
2224

2325
public static final String SHUFFLE_KEY_CLASS_NAME = "shuffle.key.class.name";
@@ -28,12 +30,8 @@ public class Constant {
2830

2931
public static final String SHUFFLE_TOPIC_SUFFIX = "-shuffleTopic";
3032

31-
public static final String TIME_TYPE = "timeType";
32-
3333
public static final String SKIP_DATA_ERROR = "skip_data_error";
3434

35-
public static final String ALLOW_LATENESS_MILLISECOND = "allowLatenessMillisecond";
36-
3735
public static final String SPLIT = "@";
3836

3937
public static final String EMPTY_BODY = "empty_body";
@@ -52,4 +50,5 @@ public class Constant {
5250

5351
public static final String STATIC_TOPIC_BROKER_NAME = "__syslo__global__";
5452

53+
public static final byte[] WATERMARK_KEY = "watermark_key".getBytes(StandardCharsets.UTF_8);
5554
}

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowab
6767
super.preProcess(context);
6868
this.stateStore = super.waitStateReplay();
6969

70-
String stateTopicName = getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
71-
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, getSourceBrokerName(), getSourceQueueId());
70+
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
71+
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
7272
}
7373

7474
@Override

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowab
6868
super.preProcess(context);
6969
this.stateStore = super.waitStateReplay();
7070

71-
String stateTopicName = getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
72-
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, getSourceBrokerName(), getSourceQueueId());
71+
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
72+
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
7373
}
7474

7575
@Override

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public JoinStreamAggregateProcessor(String name, JoinType joinType, ValueJoinAct
7070
public void preProcess(StreamContext<Object> context) throws RecoverStateStoreThrowable {
7171
super.preProcess(context);
7272
this.stateStore = super.waitStateReplay();
73-
String stateTopicName = getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
74-
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, getSourceBrokerName(), getSourceQueueId());
73+
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
74+
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
7575
}
7676

7777
@Override

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java

Lines changed: 29 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.rocketmq.common.message.MessageQueue;
2020
import org.apache.rocketmq.streams.core.common.Constant;
21+
import org.apache.rocketmq.streams.core.exception.RStreamsException;
2122
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
2223
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
2324
import org.apache.rocketmq.streams.core.metadata.Data;
@@ -33,12 +34,14 @@
3334
import org.apache.rocketmq.streams.core.window.WindowStore;
3435
import org.apache.rocketmq.streams.core.util.Pair;
3536
import org.apache.rocketmq.streams.core.util.Utils;
37+
import org.apache.rocketmq.streams.core.window.fire.JoinWindowFire;
3638
import org.slf4j.Logger;
3739
import org.slf4j.LoggerFactory;
3840

3941
import java.util.Comparator;
4042
import java.util.List;
4143
import java.util.Properties;
44+
import java.util.concurrent.atomic.AtomicReference;
4245
import java.util.function.Supplier;
4346

4447
public class JoinWindowAggregateSupplier<K, V1, V2, OUT> implements Supplier<Processor<? super OUT>> {
@@ -84,31 +87,49 @@ public void preProcess(StreamContext<Object> context) throws RecoverStateStoreTh
8487
super.preProcess(context);
8588
leftWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
8689
rightWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
87-
String stateTopicName = getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
88-
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, getSourceBrokerName(), getSourceQueueId());
89-
}
9090

91+
this.idleWindowScaner = context.getDefaultWindowScaner();
92+
93+
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
94+
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
95+
96+
this.joinWindowFire = new JoinWindowFire<>(joinType,
97+
this.stateTopicMessageQueue,
98+
context.copy(),
99+
joinAction,
100+
leftWindowStore,
101+
rightWindowStore,
102+
this::watermark);
103+
}
91104

92105
@Override
93106
public void process(Object data) throws Throwable {
94107

95108
Object key = this.context.getKey();
96109
long time = this.context.getDataTime();
97110
Properties header = this.context.getHeader();
98-
long watermark = this.context.getWatermark();
111+
long watermark = this.watermark(time, stateTopicMessageQueue);
99112
WindowInfo.JoinStream stream = (WindowInfo.JoinStream) header.get(Constant.STREAM_TAG);
100113

101114
if (time < watermark) {
102115
//已经触发,丢弃数据
116+
logger.warn("discard data:[{}], window has been fired. maxFiredWindowEnd:{}, time of data:{}, watermark:{}",
117+
data, watermark, watermark, time);
103118
return;
104119
}
105120

106121
StreamType streamType = stream.getStreamType();
107-
122+
if (streamType == null) {
123+
String format = String.format("StreamType is empty, data:%s", data);
124+
throw new IllegalStateException(format);
125+
}
108126

109127
store(key, data, time, streamType);
110128

111-
fire(watermark, streamType);
129+
List<WindowKey> fire = this.joinWindowFire.fire(this.name, watermark, streamType);
130+
for (WindowKey windowKey : fire) {
131+
this.idleWindowScaner.removeWindowKey(windowKey);
132+
}
112133
}
113134

114135

@@ -124,110 +145,16 @@ private void store(Object key, Object data, long time, StreamType streamType) th
124145
case LEFT_STREAM:
125146
WindowState<K, V1> leftState = new WindowState<>((K) key, (V1) data, time);
126147
this.leftWindowStore.put(stateTopicMessageQueue, windowKey, leftState);
148+
this.idleWindowScaner.putJoinWindowCallback(windowKey, joinWindowFire);
127149
break;
128150
case RIGHT_STREAM:
129151
WindowState<K, V2> rightState = new WindowState<>((K) key, (V2) data, time);
130152
this.rightWindowStore.put(stateTopicMessageQueue, windowKey, rightState);
153+
this.idleWindowScaner.putJoinWindowCallback(windowKey, joinWindowFire);
131154
break;
132155
}
133156
}
134-
}
135-
136-
private void fire(long watermark, StreamType streamType) throws Throwable {
137-
String leftWindow = Utils.buildKey(this.name, StreamType.LEFT_STREAM.name());
138-
WindowKey leftWindowKey = new WindowKey(leftWindow, null, watermark, 0L);
139-
List<Pair<WindowKey, WindowState<K, V1>>> leftPairs = this.leftWindowStore.searchLessThanWatermark(leftWindowKey);
140-
141-
String rightWindow = Utils.buildKey(this.name, StreamType.RIGHT_STREAM.name());
142-
WindowKey rightWindowKey = new WindowKey(rightWindow, null, watermark, 0L);
143-
List<Pair<WindowKey, WindowState<K, V2>>> rightPairs = this.rightWindowStore.searchLessThanWatermark(rightWindowKey);
144-
145-
146-
if (leftPairs.size() == 0 && rightPairs.size() == 0) {
147-
return;
148-
}
149-
150-
leftPairs.sort(Comparator.comparing(pair -> {
151-
WindowKey key = pair.getKey();
152-
return key.getWindowEnd();
153-
}));
154-
rightPairs.sort(Comparator.comparing(pair -> {
155-
WindowKey key = pair.getKey();
156-
return key.getWindowEnd();
157-
}));
158-
159-
switch (joinType) {
160-
case INNER_JOIN:
161-
//匹配上才触发
162-
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
163-
String leftPrefix = leftPair.getKey().getKeyAndWindow();
164-
165-
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
166-
String rightPrefix = rightPair.getKey().getKeyAndWindow();
167-
168-
//相同window中相同key,聚合
169-
if (leftPrefix.equals(rightPrefix)) {
170-
//do fire
171-
V1 o1 = leftPair.getValue().getValue();
172-
V2 o2 = rightPair.getValue().getValue();
173-
174-
OUT out = this.joinAction.apply(o1, o2);
175-
176-
Properties header = this.context.getHeader();
177-
header.put(Constant.WINDOW_START_TIME, leftPair.getKey().getWindowStart());
178-
header.put(Constant.WINDOW_END_TIME, leftPair.getKey().getWindowEnd());
179-
Data<K, OUT> result = new Data<>(this.context.getKey(), out, this.context.getDataTime(), header);
180-
Data<K, Object> convert = super.convert(result);
181-
this.context.forward(convert);
182-
}
183-
}
184-
}
185-
break;
186-
case LEFT_JOIN:
187-
switch (streamType) {
188-
case LEFT_STREAM:
189-
//左流全部触发,不管右流匹配上没
190-
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
191-
String leftPrefix = leftPair.getKey().getKeyAndWindow();
192-
Pair<WindowKey, WindowState<K, V2>> targetPair = null;
193-
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
194-
if (rightPair.getKey().getKeyAndWindow().equals(leftPrefix)) {
195-
targetPair = rightPair;
196-
break;
197-
}
198-
}
199-
200-
//fire
201-
V1 o1 = leftPair.getValue().getValue();
202-
V2 o2 = null;
203-
if (targetPair != null) {
204-
o2 = targetPair.getValue().getValue();
205-
}
206-
207-
OUT out = this.joinAction.apply(o1, o2);
208-
Properties header = this.context.getHeader();
209-
header.put(Constant.WINDOW_START_TIME, leftPair.getKey().getWindowStart());
210-
header.put(Constant.WINDOW_END_TIME, leftPair.getKey().getWindowEnd());
211-
Data<K, OUT> result = new Data<>(this.context.getKey(), out, this.context.getDataTime(), header);
212-
Data<K, Object> convert = super.convert(result);
213-
this.context.forward(convert);
214-
}
215-
break;
216-
case RIGHT_STREAM:
217-
//do nothing.
218-
}
219-
break;
220-
}
221-
222-
//删除状态
223-
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
224-
this.leftWindowStore.deleteByKey(leftPair.getKey());
225-
}
226157

227-
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
228-
this.rightWindowStore.deleteByKey(rightPair.getKey());
229-
}
230158
}
231-
232159
}
233160
}

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,10 @@ public interface SourceProcessor<K, V> extends Processor<V> {
4444
Pair<K, V> deserialize(String keyClass, String valueClass, byte[] data) throws DeserializeThrowable;
4545

4646
long getTimestamp(MessageExt originData, TimeType timeType);
47-
48-
default long getWatermark(long time, Long delay) {
49-
return -1;
50-
}
5147
}
5248

5349
private class SourceProcessorImpl extends AbstractProcessor<V> implements SourceProcessor<K, V> {
5450
private KeyValueDeserializer<K, V> deserializer;
55-
private long maxTimestamp = Long.MIN_VALUE;
56-
5751

5852
public SourceProcessorImpl(KeyValueDeserializer<K, V> deserializer) {
5953
this.deserializer = deserializer;
@@ -71,7 +65,6 @@ public Pair<K, V> deserialize(String keyClass, String valueClass, byte[] data) t
7165

7266
@Override
7367
public long getTimestamp(MessageExt originData, TimeType timeType) {
74-
7568
if (timeType == null) {
7669
return System.currentTimeMillis();
7770
} else if (timeType == TimeType.EVENT_TIME) {
@@ -83,13 +76,6 @@ public long getTimestamp(MessageExt originData, TimeType timeType) {
8376
}
8477
}
8578

86-
@Override
87-
public long getWatermark(long time, Long delay) {
88-
maxTimestamp = Math.max(time, this.maxTimestamp);
89-
long delayTimestamp = delay == null ? 0L : delay;
90-
91-
return maxTimestamp - delayTimestamp;
92-
}
9379

9480
@Override
9581
public void process(V data) throws Throwable {

core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/TimestampSelectorSupplier.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.rocketmq.streams.core.metadata.Data;
2121
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
2222
import org.apache.rocketmq.streams.core.running.Processor;
23+
import org.apache.rocketmq.streams.core.running.StreamContext;
2324

2425
import java.util.function.Supplier;
2526

@@ -46,8 +47,12 @@ public TimestampSelector(ValueMapperAction<T, Long> valueMapperAction) {
4647
@Override
4748
public void process(T data) throws Throwable {
4849
Long timestamp = this.valueMapperAction.convert(data);
49-
Data<Object, T> result = new Data<>(this.context.getKey(), data, timestamp, this.context.getHeader());
50-
this.context.forward(result);
50+
51+
StreamContext<T> streamContext = this.context;
52+
53+
//override the timestamp of data
54+
Data<Object, T> result = new Data<>(streamContext.getKey(), data, timestamp, streamContext.getHeader());
55+
streamContext.forward(result);
5156
}
5257
}
5358
}

0 commit comments

Comments
 (0)