规则引擎Rule Engine

    简介

    功能

    对源源不断的数据流进行实时的计算,并且实时的输出计算结果,方便用户对大量的数据进行实时分析。计算逻辑采用标准的SQL语法,无需编程。

    场景

    流式计算的应用场景非常广,比较常见的有:统计每个设备过去5分钟的滑动平均温度;统计网站每分钟的PV;监控设备离线;统计机器每个班次的产量等。

    视频教程

    了解流式计算最快的方式是观看我们的视频教程。下面是两个视频教程的链接:

    1.入门教程视频

    2.进阶教程视频

    快速上手

    下面以统计每个设备每分钟平均温度为例,帮助您在5分钟内创建一个流式计算任务。

    创建数据源

    假设您的多个设备往同一个MQTT主题发送如下格式的消息:

    {
        "ts": 1523427180,
        "deviceid": "device01",
        "temperature": 35.5
    }

    其中
    ts: 为消息产生的时间戳
    deviceid: 设备的编号
    temperature: 设备的温度

    步骤:

    1. 转到流式计算的数据源列表
    2. 点击左上角创建数据源按钮
    3. 按如图所示,填写各个参数

      图片

    4. 点击保存

    创建数据目的地

    数据目的地为接收流式计算结果的地方。我们假设计算结果发布到实例=yyj, 主题=device/result这个MQTT主题中去,并且数据格式为:

    {
        "ts_start": 1523427180000,
        "deviceid": "device01",
        "avg_temperature": 35.5
    }

    步骤:

    1. 转到流式计算的数据目的地列表
    2. 点击左上角创建数据目的地按钮
    3. 按下图所示,填写参数:

      图片

    创建任务

    任务是表达计算逻辑的地方,我们从前面创建的mysource数据源取数据,计算结果发布到前面创建的数据目的地mysink中去。

    步骤:

    1. 转到流式计算的任务列表
    2. 点击左上角的创建任务按钮
    3. 填写如下参数:

      名称: myjob

      时间类型: EVENTTIME

      并发度: 1

      SQL:

      INSERT INTO mysink 
      SELECT
          Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts_start, 
          deviceid, 
          AVG(temperature) AS avg_temperature 
      FROM mysource 
      GROUP BY 
          Tumble(rowtime, INTERVAL '1' MINUTE),
          deviceid

      说明:SQL中的mysink, mysource分别是对前面创建的数据源和目的地的引用。

    4. 点击确定
    5. 在任务列表页,点击任务myjob右边的启动链接

      稍等片刻再次刷新,任务应该处于运行中状态

    查看结果

    为了接收任务输出的结果,我们使用MQTT客户端工具(e.g MQTTBox, MQTT.fx),或者MQTT客户端库(e.g paho.mqtt.c, paho.mqtt.java),订阅数据目的地mysink中设定的主题(实例=yyj, 主题=device/result)。

    推送数据

    流式计算已经在运行了,现在我们需要向数据源MQTT主题发布一些数据,以便我们能观察到其计算结果。以mysource设定的主题(实例=yyj, 主题=device/temp)为例,依次发布如下几条消息:

    {"ts": 1523427180, "deviceid": "device01", "temperature": 35.5}
    {"ts": 1523427181, "deviceid": "device02", "temperature": 40}
    {"ts": 1523427185, "deviceid": "device01", "temperature": 36.5}
    {"ts": 1523427187, "deviceid": "device02", "temperature": 42}
    {"ts": 1523427245, "deviceid": "device01", "temperature": 35.0}

    此时,应该从device/result主题收到2条(device01和device02各一条)计算结果,如下:

    {"ts_start": 1523427180000, "deviceid": "device01", "avg_temperature": 36.0}
    {"ts_start": 1523427180000, "deviceid": "device02", "avg_temperature": 41}

    说明:

    上述结果为前面4条输入消息的计算结果,第5条输入消息需待更多的消息输入才能有输出。1523427180000为一个分钟窗口的开始时间(Tumble_Start),单位毫秒。

    上一篇
    操作指南
    下一篇
    基本概念