Skip to content

#73 #74 session window and count(distinct) implementation #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,53 @@ public WindowStream count(String asName) {
return this;
}

/**
* distinct算子
*
* @param fieldName
* @param asName
* @return
*/
public WindowStream distinct(String fieldName, String asName) {
window.getSelectMap().put(asName, asName + "=distinct(" + fieldName + ")");
return this;
}

/**
* count_distinct算子
*
* @param fieldName
* @param asName
* @return
*/
public WindowStream count_distinct(String fieldName, String asName) {
String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
String prefix = distinctName + "=distinct(" + fieldName + ")";
String suffix = asName + "=count(" + distinctName + ")";
window.getSelectMap().put(asName, prefix + ";" + suffix);
return this;
}

public WindowStream count_distinct_2(String fieldName, String asName) {
String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
String prefix = distinctName + "=distinct2(" + fieldName + ",HIT_WINDOW_INSTANCE_ID,SHUFFLE_KEY)";
String suffix = asName + "=count(" + distinctName + ")";
window.getSelectMap().put(asName, prefix + ";" + suffix);
return this;
}

/**
* count_distinct算子(数据量大,容忍较少错误率)
*
* @param fieldName
* @param asName
* @return
*/
public WindowStream count_distinct_large(String fieldName, String asName) {
window.getSelectMap().put(asName, asName + "=count_distinct(" + fieldName + ")");
return this;
}

/**
* 做min算子
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@
public class SessionWindow {
/**
* 滑动窗口信息
*
* @param time
* @return
*/
public static WindowInfo of(Time time){
WindowInfo windowInfo=new WindowInfo();
public static WindowInfo of(Time time) {
WindowInfo windowInfo = new WindowInfo();
windowInfo.setType(WindowInfo.SESSION_WINDOW);
windowInfo.setWindowSize(time);
windowInfo.setSessionTimeout(time);
return windowInfo;
}

public static WindowInfo of(Time time, String timeField) {
WindowInfo windowInfo = new WindowInfo();
windowInfo.setType(WindowInfo.SESSION_WINDOW);
windowInfo.setSessionTimeout(time);
windowInfo.setTimeField(timeField);
return windowInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public static WindowInfo of(Time time) {
windowInfo.setWindowSize(time);
return windowInfo;
}

public static WindowInfo of(Time time, String timeField) {
WindowInfo windowInfo = new WindowInfo();
windowInfo.setType(WindowInfo.TUMBLING_WINDOW);
windowInfo.setWindowSize(time);
windowInfo.setTimeField(timeField);
return windowInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.rocketmq.streams.client.transform.window;

import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.operator.impl.SessionWindow;
import org.apache.rocketmq.streams.window.operator.impl.SessionOperator;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;

/**
Expand All @@ -26,10 +26,16 @@
public class WindowInfo {
public static int HOPPING_WINDOW = 1;//滑动窗口
public static int TUMBLING_WINDOW = 2;//滚动窗口
public static int SESSION_WINDOW = 23;
public static int SESSION_WINDOW = 3;
protected int type;//window类型 hopping,Tumbling
protected Time windowSize;//窗口大小
protected Time windowSlide;//滑动大小
/**
* 会话窗口的超时时间
*/
protected Time sessionTimeout;

protected String timeField;

/**
* 创建窗口
Expand All @@ -48,12 +54,11 @@ public AbstractWindow createWindow() {
window.setTimeUnitAdjust(1);
window.setSizeInterval(windowSize.getValue());
} else if (type == SESSION_WINDOW) {
window = new SessionWindow();
window.setTimeUnitAdjust(1);
window.setSizeInterval(windowSize.getValue());
window = new SessionOperator(sessionTimeout.getValue());
} else {
throw new RuntimeException("can not support the type ,expect 1,2,3。actual is " + type);
}
window.setTimeFieldName(timeField);
return window;
}

Expand All @@ -80,4 +85,20 @@ public Time getWindowSlide() {
public void setWindowSlide(Time windowSlide) {
this.windowSlide = windowSlide;
}

public Time getSessionTimeout() {
return sessionTimeout;
}

public void setSessionTimeout(Time sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}

public String getTimeField() {
return timeField;
}

public void setTimeField(String timeField) {
this.timeField = timeField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
package org.apache.rocketmq.streams.client;

import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.client.transform.window.SessionWindow;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
import org.apache.rocketmq.streams.common.functions.MapFunction;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.junit.Assert;
import org.junit.Test;

public class WindowTest implements Serializable {
Expand All @@ -31,7 +41,7 @@ public class WindowTest implements Serializable {
public void testWindow() {
StreamBuilder.dataStream("namespace", "name")
.fromFile("/Users/duheng/project/opensource/sls_100.txt", false)
.map((MapFunction<JSONObject, String>)message -> JSONObject.parseObject(message))
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
.window(TumblingWindow.of(Time.seconds(5)))
.groupBy("ProjectName", "LogStore")
.setLocalStorageOnly(true)
Expand Down Expand Up @@ -81,4 +91,196 @@ public void foreach(JSONObject o) {
// }).toPrint().start();
// }

@Test
public void testSession() {
//dataset
List<String> behaviorList = new ArrayList<>();

JSONObject userA = new JSONObject();
userA.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userA.put("user", "userA");
userA.put("movie", "movie001");
userA.put("flag", 1);
behaviorList.add(userA.toJSONString());

userA = new JSONObject();
userA.put("time", DateUtil.parse("2021-09-09 10:00:01"));
userA.put("user", "userA");
userA.put("movie", "movie002");
userA.put("flag", 1);
behaviorList.add(userA.toJSONString());

JSONObject userB = new JSONObject();
userB.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userB.put("user", "userB");
userB.put("movie", "movie003");
userB.put("flag", 1);
behaviorList.add(userB.toJSONString());

JSONObject userC = new JSONObject();
userC.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userC.put("user", "userC");
userC.put("movie", "movie004");
userC.put("flag", 1);
behaviorList.add(userC.toJSONString());

userC = new JSONObject();
userC.put("time", DateUtil.parse("2021-09-09 10:00:06"));
userC.put("user", "userC");
userC.put("movie", "movie005");
userC.put("flag", 1);
behaviorList.add(userC.toJSONString());

File dataFile = null;
try {
dataFile = File.createTempFile("behavior", ".txt");
FileUtils.writeLines(dataFile, behaviorList);
} catch (Exception e) {
e.printStackTrace();
}

File resultFile = null;
try {
resultFile = File.createTempFile("behavior.txt", ".session");
} catch (Exception e) {
e.printStackTrace();
}

StreamBuilder.dataStream("namespace", "session_test")
.fromFile(dataFile.getAbsolutePath(), false)
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
.window(SessionWindow.of(Time.seconds(5), "time"))
.groupBy("user")
.setLocalStorageOnly(true)
.sum("flag", "count")
.toDataSteam()
.toFile(resultFile.getAbsolutePath()).start(true);

try {
Thread.sleep(1 * 60 * 1000);
List<String> sessionList = FileUtils.readLines(resultFile, "UTF-8");
Map<String, List<Pair<Pair<String, String>, Integer>>> sessionMap = new HashMap<>(4);
for (String line : sessionList) {
JSONObject object = JSONObject.parseObject(line);
String user = object.getString("user");
String startTime = object.getString("start_time");
String endTime = object.getString("end_time");
Integer value = object.getIntValue("count");
if (!sessionMap.containsKey(user)) {
sessionMap.put(user, new ArrayList<>());
}
sessionMap.get(user).add(Pair.of(Pair.of(startTime, endTime), value));
}
Assert.assertEquals(3, sessionMap.size());
Assert.assertEquals(1, sessionMap.get("userA").size());
Assert.assertEquals("2021-09-09 10:00:06", sessionMap.get("userA").get(0).getLeft().getRight());
Assert.assertEquals(2, sessionMap.get("userC").size());
Assert.assertEquals("2021-09-09 10:00:05", sessionMap.get("userC").get(0).getLeft().getRight());
Assert.assertEquals("2021-09-09 10:00:06", sessionMap.get("userC").get(1).getLeft().getLeft());
Assert.assertEquals(1, sessionMap.get("userB").size());

} catch (Exception e) {
e.printStackTrace();
} finally {
dataFile.deleteOnExit();
resultFile.deleteOnExit();
}

}

@Test
public void testCountDistinct() {
//dataset
List<String> behaviorList = new ArrayList<>();

JSONObject userA = new JSONObject();
userA.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userA.put("user", "userA");
userA.put("page", "alibaba-inc.com");
behaviorList.add(userA.toJSONString());

userA = new JSONObject();
userA.put("time", DateUtil.parse("2021-09-09 10:01:00"));
userA.put("user", "userA");
userA.put("page", "sina.com");
behaviorList.add(userA.toJSONString());

userA = new JSONObject();
userA.put("time", DateUtil.parse("2021-09-09 10:03:00"));
userA.put("user", "userA");
userA.put("page", "alibaba-inc.com");
behaviorList.add(userA.toJSONString());

JSONObject userB = new JSONObject();
userB.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userB.put("user", "userB");
userB.put("page", "sohu.com");
behaviorList.add(userB.toJSONString());

JSONObject userC = new JSONObject();
userC.put("time", DateUtil.parse("2021-09-09 10:00:00"));
userC.put("user", "userC");
userC.put("page", "qq.com");
behaviorList.add(userC.toJSONString());

userC = new JSONObject();
userC.put("time", DateUtil.parse("2021-09-09 10:03:06"));
userC.put("user", "userC");
userC.put("page", "qq.com");
behaviorList.add(userC.toJSONString());

File dataFile = null;
try {
dataFile = File.createTempFile("behavior", ".txt");
FileUtils.writeLines(dataFile, behaviorList);
} catch (Exception e) {
e.printStackTrace();
}

File resultFile = null;
try {
resultFile = File.createTempFile("behavior.txt", ".session");
} catch (Exception e) {
e.printStackTrace();
}

StreamBuilder.dataStream("namespace", "count_distinct_test")
.fromFile(dataFile.getAbsolutePath(), false)
.map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
.window(TumblingWindow.of(Time.minutes(5), "time"))
.groupBy("user")
.setLocalStorageOnly(true)
.count_distinct("page", "uv")
.count_distinct_large("page", "uv_large")
.count_distinct_2("page","uv_2")
.toDataSteam()
.toFile(resultFile.getAbsolutePath()).start(true);

try {
Thread.sleep(6 * 60 * 1000);
List<String> sessionList = FileUtils.readLines(resultFile, "UTF-8");
Map<String, Integer> statisticMap = new HashMap<>(4);
for (String line : sessionList) {
JSONObject object = JSONObject.parseObject(line);
String user = object.getString("user");
Integer userVisitCount = object.getInteger("uv");
Integer userVisitCountBasedRocksDB = object.getInteger("uv_2");
Integer userVisitCountLarge = object.getInteger("uv_large");
Assert.assertEquals(userVisitCount, userVisitCountLarge);
Assert.assertEquals(userVisitCount, userVisitCountBasedRocksDB);
statisticMap.put(user, userVisitCount);
}
Assert.assertEquals(3, statisticMap.size());
Assert.assertEquals(2, statisticMap.get("userA").longValue());
Assert.assertEquals(1, statisticMap.get("userB").longValue());
Assert.assertEquals(1, statisticMap.get("userC").longValue());

} catch (Exception e) {
e.printStackTrace();
} finally {
dataFile.deleteOnExit();
resultFile.deleteOnExit();
}
}

}
7 changes: 7 additions & 0 deletions rocketmq-streams-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
<version>3.11</version>
</dependency>

<!--hyperLogLog used in dv computation-->
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>junit</groupId>
Expand Down
Loading