diff --git a/QUICKSTART.md b/QUICKSTART.md deleted file mode 100644 index f2656698..00000000 --- a/QUICKSTART.md +++ /dev/null @@ -1,81 +0,0 @@ -# Quick Start - -本文档详细介绍了如何在rocketmq上执行流计算任务; - -## 所需环境 - -+ 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below) -+ 64bit JDK 1.8+; -+ Maven 3.2.X - -## 使用步骤 - -### 1. 创建maven项目, 并依赖rocketmq-streams的客户端 - -```xml - - - org.apache.rocketmq - rocketmq-streams-clients - 1.0.0-SNAPSHOT - -``` - -### 2. 在主函数中按照streams的开发规范,编写业务逻辑 - -```java -import org.apache.rocketmq.streams.client.transform.DataStream; - -public static void main(String[]args){ - DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline"); - - source - .fromFile("~/admin/data/text.txt",false) - .map(message->message) - .toPrint(1) - .start(); - } -``` - -### 3. 在pom.xml中加入shade插件, 将依赖的stream与业务代码一并打包, 形成-shaded.jar 包 - -```xml - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - package - - shade - - - false - true - - - org.apache.rocketmq:rocketmq-streams-clients - - - - - - - -``` - -### 4. 将jar包拷贝到应用服务器,作为普通的java应用直接运行 - -``` - java -jar XXXX-shade.jar \ - -Dlog4j.level=ERROR \ - -Dlog4j.home=/logs \ - -Xms1024m \ - -Xmx1024m -``` - - - - diff --git a/README.md b/README.md index 36b72709..d949a7b3 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ) ## [中文文档](./README-Chinese.md) -## [Quick Start](./QUICKSTART.md) +## [Quick Start](./quick_start.md) ## Features diff --git a/pom.xml b/pom.xml index 5fbb574a..1d2a6364 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 1.1 3.2.13.RELEASE 1.0-rc5 - 5.1.40 + 8.0.26 1.2.78 2.2.1 4.5.2 @@ -112,13 +112,13 @@ .asf.yaml README.md README-Chinese.md - QUICKSTART.md + quick_start.md .github/** */target/** */*.iml **/*.txt **/*.cs - src/test/resources/window_msg_* + **/*.sql @@ -283,6 +283,12 @@ rocketmq-streams-channel-rocketmq ${project.version} + + org.apache.rocketmq + rocketmq-streams-dbinit + ${project.version} + + diff --git a/quick_start.md b/quick_start.md new file mode 100644 index 00000000..6f6a3144 --- /dev/null +++ b/quick_start.md @@ -0,0 +1,71 @@ +## rocketmq-streams 快速搭建 +--- +### 前言 +本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/) + + +### 1、源码构建 + +#### 1.1、构建环境 + - JDK 1.8 and above + - Maven 3.2 and above + +#### 1.2、构建Rocketmq-streams + +`git clone https://github.com/apache/rocketmq-streams.git` +`cd rocketmq-streams` +`mvn clean -DskipTests install -U` + + +### 2、基于rocketmq-streams创建应用 + +#### 2.1、pom依赖 +```xml + + org.apache.rocketmq + rocketmq-streams-clients + +``` +#### 2.2、shade clients依赖包 +```xml + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + true + + + org.apache.rocketmq:rocketmq-streams-clients + + + + + + + + +``` + +#### 2.3、编写业务代码 +Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md) +#### 2.4、运行 +- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。 +- 命令: +``` + java -jar XXXX-shade.jar \ + -Dlog4j.level=ERROR \ + -Dlog4j.home=/logs \ + -Xms1024m \ + -Xmx1024m +``` + diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java index 6e18ba79..eb8134d9 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; + public class RocketMQSink extends AbstractSupportShuffleSink { private static final Log LOG = LogFactory.getLog(RocketMQSink.class); diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index b45df861..34dc221c 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -41,12 +41,7 @@ import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource; import org.apache.rocketmq.streams.common.channel.split.ISplit; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; -import org.apache.rocketmq.streams.common.context.AbstractContext; -import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.interfaces.IStreamOperator; -import org.apache.rocketmq.streams.common.utils.DateUtil; import org.apache.rocketmq.streams.common.utils.ReflectUtil; -import org.apache.rocketmq.streams.common.utils.RuntimeUtil; import org.apache.rocketmq.streams.debug.DebugWriter; import org.apache.rocketmq.streams.queue.RocketMQMessageQueue; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -60,7 +55,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class RocketMQSource extends AbstractSupportOffsetResetSource { @@ -134,6 +128,8 @@ protected DefaultMQPushConsumer startConsumer() { for (MessageExt msg : msgs) { String data = new String(msg.getBody(), CHARSET); JSONObject jsonObject = create(data); + + String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue()); String offset = msg.getQueueOffset() + ""; org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false); diff --git a/rocketmq-streams-clients/pom.xml b/rocketmq-streams-clients/pom.xml index 5632fd6d..dfe09868 100644 --- a/rocketmq-streams-clients/pom.xml +++ b/rocketmq-streams-clients/pom.xml @@ -52,6 +52,10 @@ org.apache.rocketmq rocketmq-streams-window + + org.apache.rocketmq + rocketmq-streams-dbinit + org.slf4j slf4j-api diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java index 0a052fab..a8b99fa9 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java @@ -18,11 +18,6 @@ package org.apache.rocketmq.streams.client; import com.google.common.collect.Maps; - -import java.util.Map; -import java.util.Properties; -import java.util.Set; - import org.apache.rocketmq.streams.client.strategy.Strategy; import org.apache.rocketmq.streams.client.transform.DataStream; import org.apache.rocketmq.streams.common.component.ComponentCreator; @@ -32,6 +27,10 @@ import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder; import org.apache.rocketmq.streams.configurable.ConfigurableComponent; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + public class DataStreamAction extends DataStream { private final Map properties = Maps.newHashMap(); diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java index 49e85594..2684c18a 100644 --- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java @@ -28,7 +28,11 @@ import org.junit.Test; import java.io.Serializable; -import java.sql.*; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; public class DataStreamTest implements Serializable { diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java index 688b4859..3eef4f22 100644 --- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java @@ -18,6 +18,16 @@ package org.apache.rocketmq.streams.client.windows; import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.streams.client.transform.DataStream; +import org.apache.rocketmq.streams.client.transform.window.Time; +import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import org.apache.rocketmq.streams.common.functions.ForEachFunction; +import org.apache.rocketmq.streams.common.functions.MapFunction; +import org.apache.rocketmq.streams.common.utils.DateUtil; +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -28,15 +38,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.rocketmq.streams.client.transform.DataStream; -import org.apache.rocketmq.streams.client.transform.window.Time; -import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; -import org.apache.rocketmq.streams.common.functions.ForEachFunction; -import org.apache.rocketmq.streams.common.functions.MapFunction; -import org.apache.rocketmq.streams.common.utils.DateUtil; -import org.apache.rocketmq.streams.common.utils.FileUtil; -import org.apache.rocketmq.streams.common.utils.MapKeyUtil; -import org.apache.rocketmq.streams.common.utils.StringUtil; import static junit.framework.TestCase.assertTrue; diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10 b/rocketmq-streams-clients/src/test/resources/window_msg_10.txt similarity index 100% rename from rocketmq-streams-clients/src/test/resources/window_msg_10 rename to rocketmq-streams-clients/src/test/resources/window_msg_10.txt diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_100 b/rocketmq-streams-clients/src/test/resources/window_msg_100.txt similarity index 100% rename from rocketmq-streams-clients/src/test/resources/window_msg_100 rename to rocketmq-streams-clients/src/test/resources/window_msg_100.txt diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_1000 b/rocketmq-streams-clients/src/test/resources/window_msg_1000.txt similarity index 100% rename from rocketmq-streams-clients/src/test/resources/window_msg_1000 rename to rocketmq-streams-clients/src/test/resources/window_msg_1000.txt diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10000 b/rocketmq-streams-clients/src/test/resources/window_msg_10000.txt similarity index 100% rename from rocketmq-streams-clients/src/test/resources/window_msg_10000 rename to rocketmq-streams-clients/src/test/resources/window_msg_10000.txt diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_88121 b/rocketmq-streams-clients/src/test/resources/window_msg_88121.txt similarity index 100% rename from rocketmq-streams-clients/src/test/resources/window_msg_88121 rename to rocketmq-streams-clients/src/test/resources/window_msg_88121.txt diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java index 8c4f63cd..ebb94159 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java @@ -16,11 +16,10 @@ */ package org.apache.rocketmq.streams.common.channel.impl; -import java.util.List; - import org.apache.rocketmq.streams.common.channel.sink.AbstractSink; import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.utils.PrintUtil; + +import java.util.List; /** * 测试使用,输出就是把消息打印出来 @@ -30,7 +29,7 @@ public class OutputPrintChannel extends AbstractSink { @Override protected boolean batchInsert(List messages) { for (IMessage msg : messages) { - //System.out.println(msg.getMessageValue()); + System.out.println(msg.getMessageValue()); } return false; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java index f8fcbcd9..3add3df8 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.net.URL; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -55,7 +56,7 @@ public FileSource(String filePath) { @Override protected boolean initConfigurable() { super.initConfigurable(); - File file = new File(filePath); + File file = getFile(filePath); if (file.exists() && file.isDirectory()) { executorService = new ThreadPoolExecutor(maxThread, maxThread, 0L, TimeUnit.MILLISECONDS, @@ -64,6 +65,23 @@ protected boolean initConfigurable() { return true; } + private File getFile(String filePath) { + File file = new File(filePath); + if (!file.exists()) { + ClassLoader loader = getClass().getClassLoader(); + URL url = loader.getResource(filePath); + + if (url != null) { + String path = url.getFile(); + file = new File(path); + this.filePath = path; + } + } + return file; + + + } + @Override protected boolean startSource() { @@ -97,7 +115,7 @@ protected boolean startSource() { */ protected LinkedBlockingQueue createIteratorList() { LinkedBlockingQueue iterators = new LinkedBlockingQueue<>(1000); - File file = new File(filePath); + File file = getFile(filePath); if (file.exists() == false) { return null; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java index 34d55368..41a113c9 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java @@ -17,13 +17,16 @@ package org.apache.rocketmq.streams.common.datatype; import com.alibaba.fastjson.JSONObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.utils.NumberUtils; + import java.sql.Timestamp; import java.text.ParsePosition; import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Date; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.rocketmq.streams.common.utils.NumberUtils; public class DateDataType extends BaseDataType { @@ -85,6 +88,10 @@ public Date convert(Object object) { if (Timestamp.class.isInstance(object)) { return new Date(((Timestamp)object).getTime()); } + if (object instanceof LocalDateTime) { + LocalDateTime tempTime = (LocalDateTime) object; + return Date.from(tempTime.atZone(ZoneId.systemDefault()).toInstant()); + } return super.convert(object); } diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml index 1eba812a..14f56794 100644 --- a/rocketmq-streams-dbinit/pom.xml +++ b/rocketmq-streams-dbinit/pom.xml @@ -31,13 +31,10 @@ src/main/resources - **/*.sql **/*.properties - - true diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql index 0c6bc816..d6c23ee5 100644 --- a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql +++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql @@ -1,24 +1,9 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - CREATE TABLE IF NOT EXISTS `window_max_value` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `gmt_create` datetime NOT NULL, `gmt_modified` datetime NOT NULL, + `max_offset` varchar(20) NOT NULL, + `is_max_offset_long` int(11) DEFAULT NULL, `max_value` bigint(20) unsigned NOT NULL, `max_event_time` bigint(20) unsigned NOT NULL, `msg_key` varchar(256) NOT NULL, diff --git a/rocketmq-streams-examples/README.md b/rocketmq-streams-examples/README.md new file mode 100644 index 00000000..53bb503c --- /dev/null +++ b/rocketmq-streams-examples/README.md @@ -0,0 +1,159 @@ +## rocketmq-streams-examples + +### 1、fileSource example +逐行读取文件数据,并打印出来。 +```java +public class FileSourceExample { + public static void main(String[] args) { + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + source.fromFile("data.txt", false) + .map(message -> message) + .toPrint(1) + .start(); + } +} + +``` + + +### 2、分时间段,统计分组中某字段的和 + +#### 2.1 安装rocketmq +可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/) + +#### 2.2 源数据 +[源数据](./../rocketmq-streams-examples/src/main/resources/data.txt) +```xml +{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"} +{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"} +{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"} +{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"} +{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"} +{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"} +{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"} +{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"} +{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"} +{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"} +``` + +#### 2.3 代码示例 + +[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java) + + +#### 2.4 结果说明 +这个例子中,使用rocketmq-streams消费rocketmq中的数据,并按照ProjectName和LogStore两个字段联合分组统计,两个字段的值相同,分为一组。 +分别统计每组的InFlow和OutFlow两字段累计和。 + +data.text数据运行的结果部分如下: + +```xml +"InFlow":22,"total":4,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":18 +"InFlow":18,"total":3,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":15 +"InFlow":15,"total":3,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":12 +``` +可见"ProjectName":"ProjectName-0","LogStore":"LogStore-0"分组公有4条数据,"ProjectName":"ProjectName-2","LogStore":"LogStore-2",3条数据。 +"ProjectName":"ProjectName-1","LogStore":"LogStore-1"分组3条数据,总共10条数据。结果与源数据一致。 + +### 3、网页点击统计 +#### 3.1、数据说明 +原始数据为resources路径下的[pageClickData.txt](./../rocketmq-streams-examples/src/main/resources/pageClickData.txt) + +第一列是用户id,第二列是用户点击时间,最后一列是网页地址 +```xml +{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"} +{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"} +{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"} +{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"} +{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"} +{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"} +{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-1"} +{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-2"} +{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"} +{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"} +``` + +#### 3.1、统计某段时间窗口内用户点击网页次数 +[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java) + +结果: +```xml +{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"SPVGTV6DaXmxV5mGNzQixQ==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"2"} +{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"3"} +{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"1"} +{"start_time":"2021-09-15 18:01:00","total":1,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"3"} +{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"YIgEKptN2Wf+Oq2m8sEcYw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"2"} +{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"iYKnwMYAzXFJYbO1KvDnng==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"1"} +{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"HBojuU6/2F/6llkyefECxw==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","userId":"4"} +``` + +在时间范围 18:00:00- 18:01:00内: + +|userId|点击次数| +|------|---| +| 1 | 2 | +| 2 | 1 | +| 3 | 1 | + +在时间范围 18:01:00- 18:02:00内: + +|userId|点击次数| +|------|---| +| 1 | 2 | +| 2 | 2 | +| 3 | 1 | + +在时间范围 18:02:00- 18:03:00内: + +|userId|点击次数| +|------|---| +| 4 | 1 | + +可查看原数据文件,eventTime为时间字段,简单检查后上述结果与预期相符合。 + +#### 3.2、统计某段时间窗口内,被点击次数最多的网页 +[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java) + +运行结果: +```xml +{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-1"} +{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"seECZRcaQSRsET1rDc6ZAw==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-2"} +{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-3"} +{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"uCqvAeaLTYRnjQm8dCZOvw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-2"} +{"start_time":"2021-09-15 18:01:00","total":3,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-3"} +{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"NdgwYMT8azNMu55NUIvygg==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","url":"page-1"} + +``` +在时间窗口18:00:00 - 18:01:00 内,有4条数据; + +在时间窗口18:01:00 - 18:02:00 内,有5条数据; + +在时间窗口18:02:00 - 18:03:00 内,有1条数据; + +分钟统计窗口内,被点击次数最多的网页. +得到上述数据后,需要按照窗口进行筛选最大值,需要再次计算。 +代码: +```java + public void findMax() { + DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1"); + source.fromFile("/home/result.txt", false) + .map(message -> JSONObject.parseObject((String) message)) + .window(TumblingWindow.of(Time.seconds(5))) + .groupBy("start_time","end_time") + .max("total") + .waterMark(1) + .setLocalStorageOnly(true) + .toDataSteam() + .toPrint(1) + .start(); + } + +``` +得到结果: +```xml +{"start_time":"2021-09-17 11:09:35","total":"2","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000001,"end_time":"2021-09-17 11:09:40"} +{"start_time":"2021-09-17 11:09:35","total":"3","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000002,"end_time":"2021-09-17 11:09:40"} +{"start_time":"2021-09-17 11:09:35","total":"1","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000003,"end_time":"2021-09-17 11:09:40"} +``` + +可以得到三个窗口中网页点击次数最多分别是2次,1次,3次。 diff --git a/rocketmq-streams-examples/pom.xml b/rocketmq-streams-examples/pom.xml index ee6bbf63..8d97fa61 100644 --- a/rocketmq-streams-examples/pom.xml +++ b/rocketmq-streams-examples/pom.xml @@ -27,24 +27,48 @@ rocketmq-streams-examples ROCKETMQ STREAMS :: examples + + + UTF-8 + ${file_encoding} + 8 + 8 + + + org.apache.rocketmq rocketmq-streams-clients - - - ch.qos.logback - logback-classic - - jar - - UTF-8 - ${file_encoding} - 8 - 8 - + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + true + + + org.apache.rocketmq:rocketmq-streams-clients + + + + + + + + \ No newline at end of file diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java new file mode 100644 index 00000000..2daa5256 --- /dev/null +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java @@ -0,0 +1,96 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.streams.examples.checkpoint; + +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.streams.client.StreamBuilder; +import org.apache.rocketmq.streams.client.source.DataStreamSource; +import org.apache.rocketmq.streams.client.transform.window.Time; +import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegate; +import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegateFactory; +import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile; + +import static org.apache.rocketmq.streams.db.driver.DriverBuilder.DEFALUT_JDBC_DRIVER; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC; + + +public class RemoteCheckpointTest { + //replace with your mysql url, database name can be anyone else. + private static final String URL = "jdbc:mysql://localhost:3306/rocketmq_streams"; + // user name of mysql + private static final String USER_NAME = ""; + //password of mysql + private static final String PASSWORD = ""; + + + static { + ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB"); + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL); + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_DRIVER, DEFALUT_JDBC_DRIVER); + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME); + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD); + } + + public static void main(String[] args) { + ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC); + DBDelegate delegate = DBDelegateFactory.getDelegate(); + delegate.init(); + + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + source.fromRocketmq( + RMQ_TOPIC, + RMQ_CONSUMER_GROUP_NAME, + false, + NAMESRV_ADDRESS) + .filter((message) -> { + try { + JSONObject.parseObject((String) message); + } catch (Throwable t) { + // if can not convert to json, discard it.because all operator are base on json. + return true; + } + return false; + }) + //must convert message to json. + .map(message -> JSONObject.parseObject((String) message)) + .window(TumblingWindow.of(Time.seconds(10))) + .groupBy("ProjectName","LogStore") + .sum("OutFlow", "OutFlow") + .sum("InFlow", "InFlow") + .count("total") + .waterMark(5) + .setLocalStorageOnly(false) + .toDataSteam() + .toPrint(1) + .start(); + } + +} diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java index d568d5ff..a7b43611 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java @@ -22,7 +22,7 @@ public class FileSourceExample { public static void main(String[] args) { DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); - source.fromFile("/your/file/path", false) + source.fromFile("data.txt", false) .map(message -> message) .toPrint(1) .start(); diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java new file mode 100644 index 00000000..95bfd5c2 --- /dev/null +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java @@ -0,0 +1,81 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.streams.examples.pageclick; + +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.streams.client.StreamBuilder; +import org.apache.rocketmq.streams.client.source.DataStreamSource; +import org.apache.rocketmq.streams.client.strategy.WindowStrategy; +import org.apache.rocketmq.streams.client.transform.window.Time; +import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile; +import org.junit.Test; + +public class PageDimension { + private static final String topic = "pageClick"; + private static final String namesrv = "127.0.0.1:9876"; + + /** + * In a certain period of time, how many times did a user click on a certain webpage + * + * @param args + */ + public static void main(String[] args) { + ProducerFromFile.produce("pageClickData.txt", namesrv, topic); + + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL"); + source.fromRocketmq(topic, "pageClickGroup", false, namesrv) + .map(message -> JSONObject.parseObject((String) message)) + .window(TumblingWindow.of(Time.minutes(1))) + .groupBy("url") + .setTimeField("eventTime") + .count("total") + .waterMark(1) + .setLocalStorageOnly(true) + .toDataSteam() + .toFile("/home/result.txt") + .with(WindowStrategy.highPerformance()) + .start(); + } + + @Test + public void findMax() { + + DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1"); + source.fromFile("/home/result.txt", false) + .map(message -> JSONObject.parseObject((String) message)) + .window(TumblingWindow.of(Time.seconds(5))) + .groupBy("start_time","end_time") + .max("total") + .waterMark(1) + .setLocalStorageOnly(true) + .toDataSteam() + .toPrint(1) + .start(); + + } + +} diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java new file mode 100644 index 00000000..c7e6c745 --- /dev/null +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java @@ -0,0 +1,67 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.streams.examples.pageclick; + +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.streams.client.StreamBuilder; +import org.apache.rocketmq.streams.client.source.DataStreamSource; +import org.apache.rocketmq.streams.client.strategy.WindowStrategy; +import org.apache.rocketmq.streams.client.transform.window.Time; +import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile; + + +public class UsersDimension { + private static final String topic = "pageClick"; + private static final String namesrv = "127.0.0.1:9876"; + + /** + * Count the number of times a user clicks on a webpage within 5s + * @param args + */ + public static void main(String[] args) { + ProducerFromFile.produce("pageClickData.txt",namesrv, topic); + + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + System.out.println("begin streams code."); + + + DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL"); + source.fromRocketmq(topic, "pageClickGroup", false, namesrv) + .map(message -> JSONObject.parseObject((String) message)) + .window(TumblingWindow.of(Time.minutes(1))) + .groupBy("userId") + .setTimeField("eventTime") + .count("total") + .waterMark(1) + .setLocalStorageOnly(true) + .toDataSteam() + .toPrint(1) + .with(WindowStrategy.highPerformance()) + .start(); + + } + + + +} diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java new file mode 100644 index 00000000..c9287f40 --- /dev/null +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java @@ -0,0 +1,27 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.streams.examples.rocketmqsource; + +public class Constant { + public static final String NAMESRV_ADDRESS = "127.0.0.1:9876"; + public static final String RMQ_TOPIC = "NormalTestTopic"; + public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-02"; + public static final String TAGS = "*"; +} diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java new file mode 100644 index 00000000..b26b66f3 --- /dev/null +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java @@ -0,0 +1,99 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.streams.examples.rocketmqsource; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +public class ProducerFromFile { + + public static void produce(String filePath, String nameServ, String topic) { + try { + DefaultMQProducer producer = new DefaultMQProducer("test-group"); + producer.setNamesrvAddr(nameServ); + producer.start(); + + List result = ProducerFromFile.read(filePath); + + for (String str : result) { + Message msg = new Message(topic, "", str.getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + //Shut down once the producer instance is not longer in use. + producer.shutdown(); + } catch (Throwable t) { + t.printStackTrace(); + } + + } + + private static File getFile(String filePath) { + File file = new File(filePath); + if (!file.exists()) { + ClassLoader loader = ProducerFromFile.class.getClassLoader(); + URL url = loader.getResource(filePath); + + if (url != null) { + String path = url.getFile(); + file = new File(path); + } + } + return file; + + } + + private static List read(String path) { + File file = getFile(path); + List result = new ArrayList<>(); + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(file)); + + String line = reader.readLine(); + while (line != null && !"".equals(line)) { + result.add(line); + line = reader.readLine(); + } + + } catch (Throwable t) { + t.printStackTrace(); + } finally { + if (reader!= null) { + try { + reader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + return result; + } +} diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java index c2e6bd13..c56c9758 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java @@ -19,14 +19,30 @@ import org.apache.rocketmq.streams.client.StreamBuilder; import org.apache.rocketmq.streams.client.source.DataStreamSource; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC; + + public class RocketMQSourceExample1 { public static void main(String[] args) { - DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC); + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + /** + * 1、before run this case, make sure some data has already been rocketmq. + */ source.fromRocketmq( - RocketMQSourceExample2.RMQ_TOPIC, - RocketMQSourceExample2.RMQ_CONSUMER_GROUP_NAME, - RocketMQSourceExample2.NAMESRV_ADDRESS + RMQ_TOPIC, + RMQ_CONSUMER_GROUP_NAME, + NAMESRV_ADDRESS ) .map(message -> message) .toPrint(1) diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java index c9d9bda4..69c217bb 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java @@ -18,21 +18,28 @@ import org.apache.rocketmq.streams.client.StreamBuilder; import org.apache.rocketmq.streams.client.source.DataStreamSource; -import org.apache.rocketmq.streams.client.transform.window.Time; -import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; -import org.apache.rocketmq.streams.client.transform.window.WindowInfo; import java.util.Arrays; -public class RocketMQSourceExample2 { - public static final String NAMESRV_ADDRESS = "127.0.0.1:9876"; - public static final String RMQ_TOPIC = "NormalTestTopic"; - public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01"; - public static final String TAGS = "*"; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC; +public class RocketMQSourceExample2 { + /** + * 1、before run this case, make sure some data has already been rocketmq. + */ public static void main(String[] args) { - DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC); + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); source.fromRocketmq( RMQ_TOPIC, RMQ_CONSUMER_GROUP_NAME, @@ -47,7 +54,7 @@ public static void main(String[] args) { .filter((value) -> { System.out.println("filter: ==========="); String messageValue = (String)value; - return !messageValue.contains("RocketMQ"); + return !messageValue.contains("InFlow"); }) .flatMap((message)->{ String value = (String) message; diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java index 1af62063..84117455 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java @@ -17,20 +17,34 @@ package org.apache.rocketmq.streams.examples.rocketmqsource; +import com.alibaba.fastjson.JSONObject; import org.apache.rocketmq.streams.client.StreamBuilder; import org.apache.rocketmq.streams.client.source.DataStreamSource; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; -public class RocketMQSourceExample3 { - public static final String NAMESRV_ADDRESS = "127.0.0.1:9876"; - public static final String RMQ_TOPIC = "NormalTestTopic"; - public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-03"; - public static final String TAGS = "*"; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC; +public class RocketMQSourceExample3 { + /** + * 1、before run this case, make sure some data has already been rocketmq. + */ public static void main(String[] args) { - DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC); + + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); source.fromRocketmq( RMQ_TOPIC, RMQ_CONSUMER_GROUP_NAME, @@ -42,12 +56,19 @@ public static void main(String[] args) { .map(message -> message) .filter((value) -> { String messageValue = (String) value; - return messageValue.contains("RocketMQ"); + return !messageValue.contains("InFlow"); }) .flatMap((message) -> { - String value = (String) message; - String[] result = value.split(" "); - return Arrays.asList(result); + JSONObject jsonObject = JSONObject.parseObject((String) message); + Set> entries = jsonObject.entrySet(); + + List result = new ArrayList<>(); + + for (Map.Entry entry : entries) { + String str = entry.getKey() + ":" + entry.getValue(); + result.add(str); + } + return result; }) .toPrint(1) .start(); diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java index a50d6cb0..6dfdfb52 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java @@ -24,15 +24,26 @@ import org.apache.rocketmq.streams.client.transform.window.Time; import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME; +import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC; + public class RocketmqWindowTest { - public static final String NAMESRV_ADDRESS = "127.0.0.1:9876"; - public static final String RMQ_TOPIC = "NormalTestTopic"; - public static final String RMQ_CONSUMER_GROUP_NAME = "group-03"; - public static final String TAGS = "*"; + /** + * 1、before run this case, make sure some data has already been rocketmq. + * 2、rocketmq allow create topic automatically. + */ public static void main(String[] args) { - DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); + ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC); + try { + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + } + System.out.println("begin streams code."); + + DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); source.fromRocketmq( RMQ_TOPIC, RMQ_CONSUMER_GROUP_NAME, @@ -47,11 +58,14 @@ public static void main(String[] args) { } return false; }) + //must convert message to json. .map(message -> JSONObject.parseObject((String) message)) - .window(TumblingWindow.of(Time.seconds(1))) - .groupBy("ProjectName", "LogStore") + .window(TumblingWindow.of(Time.seconds(10))) + .groupBy("ProjectName","LogStore") + .sum("OutFlow", "OutFlow") + .sum("InFlow", "InFlow") .count("total") - .waterMark(1) + .waterMark(5) .setLocalStorageOnly(true) .toDataSteam() .toPrint(1) diff --git a/rocketmq-streams-examples/src/main/resources/data.txt b/rocketmq-streams-examples/src/main/resources/data.txt new file mode 100644 index 00000000..d6a9ae9b --- /dev/null +++ b/rocketmq-streams-examples/src/main/resources/data.txt @@ -0,0 +1,11 @@ +{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"} +{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"} +{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"} +{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"} +{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"} +{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"} +{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"} +{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"} +{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"} +{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"} + diff --git a/rocketmq-streams-examples/src/main/resources/pageClickData.txt b/rocketmq-streams-examples/src/main/resources/pageClickData.txt new file mode 100644 index 00000000..51bd89d5 --- /dev/null +++ b/rocketmq-streams-examples/src/main/resources/pageClickData.txt @@ -0,0 +1,11 @@ +{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"} +{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"} +{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"} +{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"} +{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"} +{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"} +{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-2"} +{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-3"} +{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"} +{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"} + diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java index 8850ada7..46eb1ddc 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java @@ -16,13 +16,13 @@ */ package org.apache.rocketmq.streams.window.fire; -import java.util.HashMap; -import java.util.Map; - import org.apache.rocketmq.streams.common.channel.source.ISource; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.window.operator.AbstractWindow; +import java.util.HashMap; +import java.util.Map; + public class EventTimeManager { private Map eventTimeManagerMap=new HashMap<>(); protected ISource source; @@ -40,6 +40,8 @@ public void updateEventTime(IMessage message, AbstractWindow window){ } } } + + splitEventTimeManager.updateEventTime(message,window); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java index 70742f3a..42f54083 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java @@ -16,32 +16,26 @@ */ package org.apache.rocketmq.streams.window.fire; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.channel.source.AbstractSource; import org.apache.rocketmq.streams.common.channel.source.ISource; import org.apache.rocketmq.streams.common.channel.split.ISplit; import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.context.MessageOffset; -import org.apache.rocketmq.streams.common.utils.RuntimeUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; -import org.apache.rocketmq.streams.db.driver.DriverBuilder; -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; import org.apache.rocketmq.streams.window.model.WindowCache; import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.operator.AbstractWindow; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + public class SplitEventTimeManager { protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class); - protected Map messageSplitId2MaxTime=new HashMap<>(); + protected static Map messageSplitId2MaxTime=new HashMap<>(); private AtomicInteger queueIdCount=new AtomicInteger(0); protected Long lastUpdateTime; diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java index 0f573e27..435aa71e 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java @@ -16,18 +16,6 @@ */ package org.apache.rocketmq.streams.window.operator.impl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.streams.common.channel.split.ISplit; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.context.MessageOffset; @@ -48,6 +36,18 @@ import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager; import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + public class WindowOperator extends AbstractShuffleWindow { private static final String ORDER_BY_SPLIT_NUM="_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum @@ -80,7 +80,7 @@ public WindowOperator(int sizeInterval, String groupByFieldName, Map queueId2Offset) { + public int fireWindowInstance(WindowInstance instance, String queueId, Map queueId2Offset) { List windowValues=new ArrayList<>(); int fireCount=0; long startTime=System.currentTimeMillis(); @@ -100,10 +100,9 @@ public int fireWindowInstance(WindowInstance instance, String queueId,Map=windowCache.getBatchSize()){ @@ -121,15 +120,9 @@ public int fireWindowInstance(WindowInstance instance, String queueId,Map25000){ -// System.out.println("fire count is "+fireCountAccumulator.get()); -// } - - //long clearStart=System.currentTimeMillis(); clearFire(instance); this.sqlCache.addCache(new FiredNotifySQLElement(queueId,instance.createWindowInstanceId())); - // System.out.println("=============== fire cost is "+(System.currentTimeMillis()-startTime)+"send cost is "+sendCost+" clear cost is "+(System.currentTimeMillis()-clearStart)); - return fireCount; + return fireCount; } protected transient Map shuffleWindowInstanceId2MsgCount=new HashMap<>(); @@ -161,39 +154,39 @@ public void shuffleCalculate(List messages, WindowInstance instance, S } allWindowValues.put(storeKey,windowValue); windowValue.incrementUpdateVersion(); - Integer origValue=(Integer)windowValue.getComputedColumnResultByKey("total"); - if(origValue==null){ - origValue=0; - } + + Integer origValue = getValue(windowValue, "total"); + if(msgs!=null){ for(IMessage message:msgs){ windowValue.calculate(this,message); } } - Integer currentValue=(Integer)windowValue.getComputedColumnResultByKey("total"); - if(currentValue==null){ - currentValue=0; - } + Integer currentValue = getValue(windowValue, "total"); + shuffleCount.addAndGet(-origValue); shuffleCount.addAndGet(currentValue); - // if(shuffleCount.get()>25000){ - // System.out.println("==========shuffle count is "+shuffleCount.get()); - //} - - } if(DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()){ DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowCalculate(this,new ArrayList(allWindowValues.values()),queueId); } saveStorage(allWindowValues,instance,queueId); - //Integer count=shuffleWindowInstanceId2MsgCount.get(instance.createWindowInstanceId()); - //if(count==null){ - // count=0; - //} - //count+=messages.size(); - //shuffleWindowInstanceId2MsgCount.put(instance.createWindowInstanceId(),count); + } + + private Integer getValue(WindowValue windowValue, String fieldName) { + Object value = windowValue.getComputedColumnResultByKey(fieldName); + if (value == null) { + return 0; + } + if (value instanceof Integer) { + return (Integer) value; + } else if (value instanceof String) { + String strValue = (String) value; + return Integer.valueOf(strValue); + } + throw new ClassCastException("value:["+value+"] of fieldName:["+fieldName+"] can not change to number."); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index ba17a442..523eb0f3 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@ -20,10 +20,6 @@ import com.alibaba.fastjson.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - - -import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink; import org.apache.rocketmq.streams.common.channel.source.AbstractSource; import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage; @@ -32,32 +28,36 @@ import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointState; import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.common.context.AbstractContext; +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.context.Message; import org.apache.rocketmq.streams.common.context.MessageOffset; import org.apache.rocketmq.streams.common.interfaces.ISystemMessage; import org.apache.rocketmq.streams.common.topology.ChainPipeline; import org.apache.rocketmq.streams.common.topology.model.Pipeline; import org.apache.rocketmq.streams.common.utils.CollectionUtil; import org.apache.rocketmq.streams.common.utils.DateUtil; -import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.common.utils.TraceUtil; import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; import org.apache.rocketmq.streams.window.debug.DebugWriter; +import org.apache.rocketmq.streams.window.model.WindowCache; +import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; import org.apache.rocketmq.streams.window.operator.AbstractWindow; -import org.apache.rocketmq.streams.common.context.AbstractContext; -import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.context.Message; -import org.apache.rocketmq.streams.common.utils.MapKeyUtil; -import org.apache.rocketmq.streams.common.utils.StringUtil; -import org.apache.rocketmq.streams.window.model.WindowInstance; -import org.apache.rocketmq.streams.window.model.WindowCache; import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator; import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement; import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * 负责处理分片