Skip to content

Commit f0d79a4

Browse files
authored
Merge pull request #70 from speak2me/commit_20210922
#73 #74 session window and count(distinct) implementation
2 parents 6a3ea18 + 0b1eac6 commit f0d79a4

File tree

51 files changed

+2877
-439
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2877
-439
lines changed

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,53 @@ public WindowStream count(String asName) {
5959
return this;
6060
}
6161

62+
/**
63+
* distinct算子
64+
*
65+
* @param fieldName
66+
* @param asName
67+
* @return
68+
*/
69+
public WindowStream distinct(String fieldName, String asName) {
70+
window.getSelectMap().put(asName, asName + "=distinct(" + fieldName + ")");
71+
return this;
72+
}
73+
74+
/**
75+
* count_distinct算子
76+
*
77+
* @param fieldName
78+
* @param asName
79+
* @return
80+
*/
81+
public WindowStream count_distinct(String fieldName, String asName) {
82+
String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
83+
String prefix = distinctName + "=distinct(" + fieldName + ")";
84+
String suffix = asName + "=count(" + distinctName + ")";
85+
window.getSelectMap().put(asName, prefix + ";" + suffix);
86+
return this;
87+
}
88+
89+
public WindowStream count_distinct_2(String fieldName, String asName) {
90+
String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
91+
String prefix = distinctName + "=distinct2(" + fieldName + ",HIT_WINDOW_INSTANCE_ID,SHUFFLE_KEY)";
92+
String suffix = asName + "=count(" + distinctName + ")";
93+
window.getSelectMap().put(asName, prefix + ";" + suffix);
94+
return this;
95+
}
96+
97+
/**
98+
* count_distinct算子(数据量大,容忍较少错误率)
99+
*
100+
* @param fieldName
101+
* @param asName
102+
* @return
103+
*/
104+
public WindowStream count_distinct_large(String fieldName, String asName) {
105+
window.getSelectMap().put(asName, asName + "=count_distinct(" + fieldName + ")");
106+
return this;
107+
}
108+
62109
/**
63110
* 做min算子
64111
*

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/SessionWindow.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@
2020
public class SessionWindow {
2121
/**
2222
* 滑动窗口信息
23+
*
2324
* @param time
2425
* @return
2526
*/
26-
public static WindowInfo of(Time time){
27-
WindowInfo windowInfo=new WindowInfo();
27+
public static WindowInfo of(Time time) {
28+
WindowInfo windowInfo = new WindowInfo();
2829
windowInfo.setType(WindowInfo.SESSION_WINDOW);
29-
windowInfo.setWindowSize(time);
30+
windowInfo.setSessionTimeout(time);
31+
return windowInfo;
32+
}
33+
34+
public static WindowInfo of(Time time, String timeField) {
35+
WindowInfo windowInfo = new WindowInfo();
36+
windowInfo.setType(WindowInfo.SESSION_WINDOW);
37+
windowInfo.setSessionTimeout(time);
38+
windowInfo.setTimeField(timeField);
3039
return windowInfo;
3140
}
3241
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/TumblingWindow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ public static WindowInfo of(Time time) {
3030
windowInfo.setWindowSize(time);
3131
return windowInfo;
3232
}
33+
34+
public static WindowInfo of(Time time, String timeField) {
35+
WindowInfo windowInfo = new WindowInfo();
36+
windowInfo.setType(WindowInfo.TUMBLING_WINDOW);
37+
windowInfo.setWindowSize(time);
38+
windowInfo.setTimeField(timeField);
39+
return windowInfo;
40+
}
3341
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.rocketmq.streams.client.transform.window;
1818

1919
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
20-
import org.apache.rocketmq.streams.window.operator.impl.SessionWindow;
20+
import org.apache.rocketmq.streams.window.operator.impl.SessionOperator;
2121
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
2222

2323
/**
@@ -26,10 +26,16 @@
2626
public class WindowInfo {
2727
public static int HOPPING_WINDOW = 1;//滑动窗口
2828
public static int TUMBLING_WINDOW = 2;//滚动窗口
29-
public static int SESSION_WINDOW = 23;
29+
public static int SESSION_WINDOW = 3;
3030
protected int type;//window类型 hopping,Tumbling
3131
protected Time windowSize;//窗口大小
3232
protected Time windowSlide;//滑动大小
33+
/**
34+
* 会话窗口的超时时间
35+
*/
36+
protected Time sessionTimeout;
37+
38+
protected String timeField;
3339

3440
/**
3541
* 创建窗口
@@ -48,12 +54,11 @@ public AbstractWindow createWindow() {
4854
window.setTimeUnitAdjust(1);
4955
window.setSizeInterval(windowSize.getValue());
5056
} else if (type == SESSION_WINDOW) {
51-
window = new SessionWindow();
52-
window.setTimeUnitAdjust(1);
53-
window.setSizeInterval(windowSize.getValue());
57+
window = new SessionOperator(sessionTimeout.getValue());
5458
} else {
5559
throw new RuntimeException("can not support the type ,expect 1,2,3。actual is " + type);
5660
}
61+
window.setTimeFieldName(timeField);
5762
return window;
5863
}
5964

@@ -80,4 +85,20 @@ public Time getWindowSlide() {
8085
public void setWindowSlide(Time windowSlide) {
8186
this.windowSlide = windowSlide;
8287
}
88+
89+
public Time getSessionTimeout() {
90+
return sessionTimeout;
91+
}
92+
93+
public void setSessionTimeout(Time sessionTimeout) {
94+
this.sessionTimeout = sessionTimeout;
95+
}
96+
97+
public String getTimeField() {
98+
return timeField;
99+
}
100+
101+
public void setTimeField(String timeField) {
102+
this.timeField = timeField;
103+
}
83104
}

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java

Lines changed: 203 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,21 @@
1818
package org.apache.rocketmq.streams.client;
1919

2020
import com.alibaba.fastjson.JSONObject;
21+
import java.io.File;
2122
import java.io.Serializable;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.apache.commons.io.FileUtils;
28+
import org.apache.commons.lang3.tuple.Pair;
29+
import org.apache.rocketmq.streams.client.transform.window.SessionWindow;
2230
import org.apache.rocketmq.streams.client.transform.window.Time;
2331
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
2432
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
2533
import org.apache.rocketmq.streams.common.functions.MapFunction;
34+
import org.apache.rocketmq.streams.common.utils.DateUtil;
35+
import org.junit.Assert;
2636
import org.junit.Test;
2737

2838
public class WindowTest implements Serializable {
@@ -31,7 +41,7 @@ public class WindowTest implements Serializable {
3141
public void testWindow() {
3242
StreamBuilder.dataStream("namespace", "name")
3343
.fromFile("/Users/duheng/project/opensource/sls_100.txt", false)
34-
.map((MapFunction<JSONObject, String>)message -> JSONObject.parseObject(message))
44+
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
3545
.window(TumblingWindow.of(Time.seconds(5)))
3646
.groupBy("ProjectName", "LogStore")
3747
.setLocalStorageOnly(true)
@@ -81,4 +91,196 @@ public void foreach(JSONObject o) {
8191
// }).toPrint().start();
8292
// }
8393

94+
@Test
95+
public void testSession() {
96+
//dataset
97+
List<String> behaviorList = new ArrayList<>();
98+
99+
JSONObject userA = new JSONObject();
100+
userA.put("time", DateUtil.parse("2021-09-09 10:00:00"));
101+
userA.put("user", "userA");
102+
userA.put("movie", "movie001");
103+
userA.put("flag", 1);
104+
behaviorList.add(userA.toJSONString());
105+
106+
userA = new JSONObject();
107+
userA.put("time", DateUtil.parse("2021-09-09 10:00:01"));
108+
userA.put("user", "userA");
109+
userA.put("movie", "movie002");
110+
userA.put("flag", 1);
111+
behaviorList.add(userA.toJSONString());
112+
113+
JSONObject userB = new JSONObject();
114+
userB.put("time", DateUtil.parse("2021-09-09 10:00:00"));
115+
userB.put("user", "userB");
116+
userB.put("movie", "movie003");
117+
userB.put("flag", 1);
118+
behaviorList.add(userB.toJSONString());
119+
120+
JSONObject userC = new JSONObject();
121+
userC.put("time", DateUtil.parse("2021-09-09 10:00:00"));
122+
userC.put("user", "userC");
123+
userC.put("movie", "movie004");
124+
userC.put("flag", 1);
125+
behaviorList.add(userC.toJSONString());
126+
127+
userC = new JSONObject();
128+
userC.put("time", DateUtil.parse("2021-09-09 10:00:06"));
129+
userC.put("user", "userC");
130+
userC.put("movie", "movie005");
131+
userC.put("flag", 1);
132+
behaviorList.add(userC.toJSONString());
133+
134+
File dataFile = null;
135+
try {
136+
dataFile = File.createTempFile("behavior", ".txt");
137+
FileUtils.writeLines(dataFile, behaviorList);
138+
} catch (Exception e) {
139+
e.printStackTrace();
140+
}
141+
142+
File resultFile = null;
143+
try {
144+
resultFile = File.createTempFile("behavior.txt", ".session");
145+
} catch (Exception e) {
146+
e.printStackTrace();
147+
}
148+
149+
StreamBuilder.dataStream("namespace", "session_test")
150+
.fromFile(dataFile.getAbsolutePath(), false)
151+
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
152+
.window(SessionWindow.of(Time.seconds(5), "time"))
153+
.groupBy("user")
154+
.setLocalStorageOnly(true)
155+
.sum("flag", "count")
156+
.toDataSteam()
157+
.toFile(resultFile.getAbsolutePath()).start(true);
158+
159+
try {
160+
Thread.sleep(1 * 60 * 1000);
161+
List<String> sessionList = FileUtils.readLines(resultFile, "UTF-8");
162+
Map<String, List<Pair<Pair<String, String>, Integer>>> sessionMap = new HashMap<>(4);
163+
for (String line : sessionList) {
164+
JSONObject object = JSONObject.parseObject(line);
165+
String user = object.getString("user");
166+
String startTime = object.getString("start_time");
167+
String endTime = object.getString("end_time");
168+
Integer value = object.getIntValue("count");
169+
if (!sessionMap.containsKey(user)) {
170+
sessionMap.put(user, new ArrayList<>());
171+
}
172+
sessionMap.get(user).add(Pair.of(Pair.of(startTime, endTime), value));
173+
}
174+
Assert.assertEquals(3, sessionMap.size());
175+
Assert.assertEquals(1, sessionMap.get("userA").size());
176+
Assert.assertEquals("2021-09-09 10:00:06", sessionMap.get("userA").get(0).getLeft().getRight());
177+
Assert.assertEquals(2, sessionMap.get("userC").size());
178+
Assert.assertEquals("2021-09-09 10:00:05", sessionMap.get("userC").get(0).getLeft().getRight());
179+
Assert.assertEquals("2021-09-09 10:00:06", sessionMap.get("userC").get(1).getLeft().getLeft());
180+
Assert.assertEquals(1, sessionMap.get("userB").size());
181+
182+
} catch (Exception e) {
183+
e.printStackTrace();
184+
} finally {
185+
dataFile.deleteOnExit();
186+
resultFile.deleteOnExit();
187+
}
188+
189+
}
190+
191+
@Test
192+
public void testCountDistinct() {
193+
//dataset
194+
List<String> behaviorList = new ArrayList<>();
195+
196+
JSONObject userA = new JSONObject();
197+
userA.put("time", DateUtil.parse("2021-09-09 10:00:00"));
198+
userA.put("user", "userA");
199+
userA.put("page", "alibaba-inc.com");
200+
behaviorList.add(userA.toJSONString());
201+
202+
userA = new JSONObject();
203+
userA.put("time", DateUtil.parse("2021-09-09 10:01:00"));
204+
userA.put("user", "userA");
205+
userA.put("page", "sina.com");
206+
behaviorList.add(userA.toJSONString());
207+
208+
userA = new JSONObject();
209+
userA.put("time", DateUtil.parse("2021-09-09 10:03:00"));
210+
userA.put("user", "userA");
211+
userA.put("page", "alibaba-inc.com");
212+
behaviorList.add(userA.toJSONString());
213+
214+
JSONObject userB = new JSONObject();
215+
userB.put("time", DateUtil.parse("2021-09-09 10:00:00"));
216+
userB.put("user", "userB");
217+
userB.put("page", "sohu.com");
218+
behaviorList.add(userB.toJSONString());
219+
220+
JSONObject userC = new JSONObject();
221+
userC.put("time", DateUtil.parse("2021-09-09 10:00:00"));
222+
userC.put("user", "userC");
223+
userC.put("page", "qq.com");
224+
behaviorList.add(userC.toJSONString());
225+
226+
userC = new JSONObject();
227+
userC.put("time", DateUtil.parse("2021-09-09 10:03:06"));
228+
userC.put("user", "userC");
229+
userC.put("page", "qq.com");
230+
behaviorList.add(userC.toJSONString());
231+
232+
File dataFile = null;
233+
try {
234+
dataFile = File.createTempFile("behavior", ".txt");
235+
FileUtils.writeLines(dataFile, behaviorList);
236+
} catch (Exception e) {
237+
e.printStackTrace();
238+
}
239+
240+
File resultFile = null;
241+
try {
242+
resultFile = File.createTempFile("behavior.txt", ".session");
243+
} catch (Exception e) {
244+
e.printStackTrace();
245+
}
246+
247+
StreamBuilder.dataStream("namespace", "count_distinct_test")
248+
.fromFile(dataFile.getAbsolutePath(), false)
249+
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
250+
.window(TumblingWindow.of(Time.minutes(5), "time"))
251+
.groupBy("user")
252+
.setLocalStorageOnly(true)
253+
.count_distinct("page", "uv")
254+
.count_distinct_large("page", "uv_large")
255+
.count_distinct_2("page","uv_2")
256+
.toDataSteam()
257+
.toFile(resultFile.getAbsolutePath()).start(true);
258+
259+
try {
260+
Thread.sleep(6 * 60 * 1000);
261+
List<String> sessionList = FileUtils.readLines(resultFile, "UTF-8");
262+
Map<String, Integer> statisticMap = new HashMap<>(4);
263+
for (String line : sessionList) {
264+
JSONObject object = JSONObject.parseObject(line);
265+
String user = object.getString("user");
266+
Integer userVisitCount = object.getInteger("uv");
267+
Integer userVisitCountBasedRocksDB = object.getInteger("uv_2");
268+
Integer userVisitCountLarge = object.getInteger("uv_large");
269+
Assert.assertEquals(userVisitCount, userVisitCountLarge);
270+
Assert.assertEquals(userVisitCount, userVisitCountBasedRocksDB);
271+
statisticMap.put(user, userVisitCount);
272+
}
273+
Assert.assertEquals(3, statisticMap.size());
274+
Assert.assertEquals(2, statisticMap.get("userA").longValue());
275+
Assert.assertEquals(1, statisticMap.get("userB").longValue());
276+
Assert.assertEquals(1, statisticMap.get("userC").longValue());
277+
278+
} catch (Exception e) {
279+
e.printStackTrace();
280+
} finally {
281+
dataFile.deleteOnExit();
282+
resultFile.deleteOnExit();
283+
}
284+
}
285+
84286
}

rocketmq-streams-commons/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
<version>3.11</version>
3838
</dependency>
3939

40+
<!--hyperLogLog used in dv computation-->
41+
<dependency>
42+
<groupId>net.agkn</groupId>
43+
<artifactId>hll</artifactId>
44+
<version>1.6.0</version>
45+
</dependency>
46+
4047
<!-- 测试依赖 -->
4148
<dependency>
4249
<groupId>junit</groupId>

0 commit comments

Comments
 (0)