简介
所有文档

          规则引擎 Rule Engine

          简介

          功能

          对源源不断的数据流进行实时的计算,并且实时的输出计算结果,方便用户对大量的数据进行实时分析。计算逻辑采用标准的SQL语法,无需编程。

          场景

          流式计算的应用场景非常广,比较常见的有:统计每个设备过去5分钟的滑动平均温度;统计网站每分钟的PV;监控设备离线;统计机器每个班次的产量等。

          视频教程

          了解流式计算最快的方式是观看我们的视频教程。下面是两个视频教程的链接:

          1.入门教程视频

          2.进阶教程视频

          快速上手

          下面以统计每个设备每分钟平均温度为例,帮助您在5分钟内创建一个流式计算任务。

          创建数据源

          假设您的多个设备往同一个MQTT主题发送如下格式的消息:

          {
              "ts": 1523427180,
              "deviceid": "device01",
              "temperature": 35.5
          }

          其中
          ts: 为消息产生的时间戳
          deviceid: 设备的编号
          temperature: 设备的温度

          步骤:

          1. 转到流式计算的数据源列表
          2. 点击左上角创建数据源按钮
          3. 按如图所示,填写各个参数

            图片

          4. 点击保存

          创建数据目的地

          数据目的地为接收流式计算结果的地方。我们假设计算结果发布到实例=yyj, 主题=device/result这个MQTT主题中去,并且数据格式为:

          {
              "ts_start": 1523427180000,
              "deviceid": "device01",
              "avg_temperature": 35.5
          }

          步骤:

          1. 转到流式计算的数据目的地列表
          2. 点击左上角创建数据目的地按钮
          3. 按下图所示,填写参数:

            图片

          创建任务

          任务是表达计算逻辑的地方,我们从前面创建的mysource数据源取数据,计算结果发布到前面创建的数据目的地mysink中去。

          步骤:

          1. 转到流式计算的任务列表
          2. 点击左上角的创建任务按钮
          3. 填写如下参数:

            名称: myjob

            时间类型: EVENTTIME

            并发度: 1

            SQL:

            INSERT INTO mysink 
            SELECT
                Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts_start, 
                deviceid, 
                AVG(temperature) AS avg_temperature 
            FROM mysource 
            GROUP BY 
                Tumble(rowtime, INTERVAL '1' MINUTE),
                deviceid

            说明:SQL中的mysink, mysource分别是对前面创建的数据源和目的地的引用。

          4. 点击确定
          5. 在任务列表页,点击任务myjob右边的启动链接

            稍等片刻再次刷新,任务应该处于运行中状态

          查看结果

          为了接收任务输出的结果,我们使用MQTT客户端工具(e.g MQTTBox, MQTT.fx),或者MQTT客户端库(e.g paho.mqtt.c, paho.mqtt.java),订阅数据目的地mysink中设定的主题(实例=yyj, 主题=device/result)。

          推送数据

          流式计算已经在运行了,现在我们需要向数据源MQTT主题发布一些数据,以便我们能观察到其计算结果。以mysource设定的主题(实例=yyj, 主题=device/temp)为例,依次发布如下几条消息:

          {"ts": 1523427180, "deviceid": "device01", "temperature": 35.5}
          {"ts": 1523427181, "deviceid": "device02", "temperature": 40}
          {"ts": 1523427185, "deviceid": "device01", "temperature": 36.5}
          {"ts": 1523427187, "deviceid": "device02", "temperature": 42}
          {"ts": 1523427245, "deviceid": "device01", "temperature": 35.0}

          此时,应该从device/result主题收到2条(device01和device02各一条)计算结果,如下:

          {"ts_start": 1523427180000, "deviceid": "device01", "avg_temperature": 36.0}
          {"ts_start": 1523427180000, "deviceid": "device02", "avg_temperature": 41}

          说明:

          上述结果为前面4条输入消息的计算结果,第5条输入消息需待更多的消息输入才能有输出。1523427180000为一个分钟窗口的开始时间(Tumble_Start),单位毫秒。

          上一篇
          操作指南
          下一篇
          基本概念