简介
功能
对源源不断的数据流进行实时的计算,并且实时的输出计算结果,方便用户对大量的数据进行实时分析。计算逻辑采用标准的SQL语法,无需编程。
场景
流式计算的应用场景非常广,比较常见的有:统计每个设备过去5分钟的滑动平均温度;统计网站每分钟的PV;监控设备离线;统计机器每个班次的产量等。
视频教程
了解流式计算最快的方式是观看我们的视频教程。下面是两个视频教程的链接:
1.入门教程视频
2.进阶教程视频
快速上手
下面以统计每个设备每分钟平均温度为例,帮助您在5分钟内创建一个流式计算任务。
创建数据源
假设您的多个设备往同一个MQTT主题发送如下格式的消息:
{
"ts": 1523427180,
"deviceid": "device01",
"temperature": 35.5
}
其中
ts: 为消息产生的时间戳
deviceid: 设备的编号
temperature: 设备的温度
步骤:
- 转到流式计算的数据源列表
- 点击左上角创建数据源按钮
-
按如图所示,填写各个参数
- 点击保存
创建数据目的地
数据目的地为接收流式计算结果的地方。我们假设计算结果发布到实例=yyj, 主题=device/result这个MQTT主题中去,并且数据格式为:
{
"ts_start": 1523427180000,
"deviceid": "device01",
"avg_temperature": 35.5
}
步骤:
- 转到流式计算的数据目的地列表
- 点击左上角创建数据目的地按钮
-
按下图所示,填写参数:
创建任务
任务是表达计算逻辑的地方,我们从前面创建的mysource数据源取数据,计算结果发布到前面创建的数据目的地mysink中去。
步骤:
- 转到流式计算的任务列表
- 点击左上角的创建任务按钮
-
填写如下参数:
名称: myjob
时间类型: EVENTTIME
并发度: 1
SQL:
INSERT INTO mysink SELECT Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts_start, deviceid, AVG(temperature) AS avg_temperature FROM mysource GROUP BY Tumble(rowtime, INTERVAL '1' MINUTE), deviceid
说明:SQL中的mysink, mysource分别是对前面创建的数据源和目的地的引用。
- 点击确定
-
在任务列表页,点击任务myjob右边的启动链接
稍等片刻再次刷新,任务应该处于运行中状态
查看结果
为了接收任务输出的结果,我们使用MQTT客户端工具(e.g MQTTBox, MQTT.fx),或者MQTT客户端库(e.g paho.mqtt.c, paho.mqtt.java),订阅数据目的地mysink中设定的主题(实例=yyj, 主题=device/result)。
推送数据
流式计算已经在运行了,现在我们需要向数据源MQTT主题发布一些数据,以便我们能观察到其计算结果。以mysource设定的主题(实例=yyj, 主题=device/temp)为例,依次发布如下几条消息:
{"ts": 1523427180, "deviceid": "device01", "temperature": 35.5}
{"ts": 1523427181, "deviceid": "device02", "temperature": 40}
{"ts": 1523427185, "deviceid": "device01", "temperature": 36.5}
{"ts": 1523427187, "deviceid": "device02", "temperature": 42}
{"ts": 1523427245, "deviceid": "device01", "temperature": 35.0}
此时,应该从device/result主题收到2条(device01和device02各一条)计算结果,如下:
{"ts_start": 1523427180000, "deviceid": "device01", "avg_temperature": 36.0}
{"ts_start": 1523427180000, "deviceid": "device02", "avg_temperature": 41}
说明:
上述结果为前面4条输入消息的计算结果,第5条输入消息需待更多的消息输入才能有输出。1523427180000为一个分钟窗口的开始时间(Tumble_Start),单位毫秒。