简介

功能

对源源不断的数据流进行实时的计算,并且实时的输出计算结果,方便用户对大量的数据进行实时分析。计算逻辑采用标准的SQL语法,无需编程。

场景

流式计算的应用场景非常广,比较常见的有:统计每个设备过去5分钟的滑动平均温度;统计网站每分钟的PV;监控设备离线;统计机器每个班次的产量等。

视频教程

了解流式计算最快的方式是观看我们的视频教程。下面是两个视频教程的链接:

  1. 入门教程:

    https://cloud.baidu.com/doc/IOT/VideoCenter.html#.66.1B.60.37.ED.20.CE.26.2B.6B.A8.08.1D.7E.C1.B5

  2. 进阶教程:

    https://cloud.baidu.com/doc/IOT/VideoCenter.html#.41.64.E6.C8.1B.BA.F7.18.83.24.6F.48.CC.A4.84.50

快速上手

下面以统计每个设备每分钟平均温度为例,帮助您在5分钟内创建一个流式计算任务。

创建数据源

假设您的多个设备往同一个MQTT主题发送如下格式的消息:

{
    "ts": 1523427180,
    "deviceid": "device01",
    "temperature": 35.5
}

其中

ts: 为消息产生的时间戳
deviceid: 设备的编号
temperature: 设备的温度

步骤:

  1. 转到流式计算的数据源列表
  2. 点击左上角创建数据源按钮
  3. 按如图所示,填写各个参数

    图片

  4. 点击保存

创建数据目的地

数据目的地为接收流式计算结果的地方。我们假设计算结果发布到实例=yyj, 主题=device/result这个MQTT主题中去,并且数据格式为:

{
    "ts_start": 1523427180000,
    "deviceid": "device01",
    "avg_temperature": 35.5
}

步骤:

  1. 转到流式计算的数据目的地列表
  2. 点击左上角创建数据目的地按钮
  3. 按下图所示,填写参数:

    图片

创建任务

任务是表达计算逻辑的地方,我们从前面创建的mysource数据源取数据,计算结果发布到前面创建的数据目的地mysink中去。

步骤:

  1. 转到流式计算的任务列表
  2. 点击左上角的创建任务按钮
  3. 填写如下参数:

    名称: myjob

    时间类型: EVENTTIME

    并发度: 1

    SQL:

    INSERT INTO mysink 
    SELECT
       Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts_start, 
       deviceid, 
       AVG(temperature) AS avg_temperature 
    FROM mysource 
    GROUP BY 
       Tumble(rowtime, INTERVAL '1' MINUTE),
       deviceid
    

    说明:SQL中的mysink, mysource分别是对前面创建的数据源和目的地的引用。

  4. 点击确定

  5. 在任务列表页,点击任务myjob右边的启动链接

    稍等片刻再次刷新,任务应该处于运行中状态

查看结果

为了接收任务输出的结果,我们使用MQTT客户端工具(e.g MQTTBox, MQTT.fx),或者MQTT客户端库(e.g paho.mqtt.c, paho.mqtt.java),订阅数据目的地mysink中设定的主题(实例=yyj, 主题=device/result)。

推送数据

流式计算已经在运行了,现在我们需要向数据源MQTT主题发布一些数据,以便我们能观察到其计算结果。以mysource设定的主题(实例=yyj, 主题=device/temp)为例,依次发布如下几条消息:

{"ts": 1523427180, "deviceid": "device01", "temperature": 35.5}
{"ts": 1523427181, "deviceid": "device02", "temperature": 40}
{"ts": 1523427185, "deviceid": "device01", "temperature": 36.5}
{"ts": 1523427187, "deviceid": "device02", "temperature": 42}
{"ts": 1523427245, "deviceid": "device01", "temperature": 35.0}

此时,应该从device/result主题收到2条(device01和device02各一条)计算结果,如下:

{"ts_start": 1523427180000, "deviceid": "device01", "avg_temperature": 36.0}
{"ts_start": 1523427180000, "deviceid": "device02", "avg_temperature": 41}

说明:

上述结果为前面4条输入消息的计算结果,第5条输入消息需待更多的消息输入才能有输出。1523427180000为一个分钟窗口的开始时间(Tumble_Start),单位毫秒。