规则引擎Rule Engine

    SQL示例

    固定窗口

    INSERT INTO mysink 
    SELECT 
        TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS ts,
        deviceid,
        AVG(temperature) AS avg_temp
    FROM mysource 
    GROUP BY 
        TUMBLE(rowtime, INTERVAL '5' MINUTE),
        deviceid

    该SQL统计每个设备(deviceid)5分钟(INTERVAL '5' MINUTE)的平均温度(temperature)。

    滑动窗口

    INSERT INTO mysink 
    SELECT 
        HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS ts, 
        deviceid,
        COUNT(deviceid) AS cnt
    FROM mysource
    GROUP BY
        HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),
        deviceid

    该SQL每分钟统计一次每个设备过去5分钟消息的个数。10:09分统计[10:04-10:09)共5分钟的数据;10:10分统计[10:05-10:10)共5分钟的数据。

    会话窗口

    INSERT INTO mysink
    SELECT 
        SESSION_START(rowtime, INTERVAL '30' MINUTE) AS `timestamp`,
        'device_output' AS metric,
        deviceid,
        SUM(val) AS `value`
    FROM mysource
    GROUP BY 
        SESSION(rowtime, INTERVAL '30' MINUTE),
        deviceid

    统计每个设备每个班次(班次间隔为至少30分钟)的总产量(val)。当设备超过30分钟没有上报数据,则认为一个班次的结束。输出的格式可以直接写入TSDB。

    两个数据流的Join查询

    INSERT INTO mysink
    SELECT 
        req.rowtime AS ts_req,
        CAST(ack.rowtime AS TIMESTAMP) AS ts_ack,
        ack.deviceid
    FROM 
        mysource1 req, 
        mysource2 ack
    WHERE 
        req.deviceid = ack.deviceid 
        AND req.desired = ack.reported 
        AND ack.rowtime BETWEEN req.rowtime AND req.rowtime + INTERVAL '15' SECOND

    mysource1为发出设备控制命令的数据流(desired),mysource2为设备操作成功反馈的流(reported)。

    将控制流和反馈流join,提取每次设备控制命令从下发后15秒之内收到反馈成功的时间。

    ROWS OVER窗口

    INSERT INTO mysink 
    SELECT 
        rowtime, 
        deviceid,
        MAX(temperature) OVER ( 
        PARTITION BY deviceid 
        ORDER BY rowtime 
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS max_temperature 
    FROM mysource

    针对每个设备上报的温度,计算该设备前面3次(包含本次)上报的最大温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个ROWS OVER窗口,每个窗口必须一致。

    RANGE OVER窗口

    INSERT INTO mysink 
    SELECT 
        rowtime,  
        deviceid, 
        MIN(temperature) OVER ( 
        PARTITION BY deviceid 
        ORDER BY rowtime 
        RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW) AS min_temperature 
    FROM mysource

    针对每个设备上报的温度,计算该设备前面5秒钟(包含本次)上报的最小温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个RANGE OVER窗口,每个窗口必须一致。

    ROWS OVER窗口计算车辆每分钟行驶里程

    INSERT INTO mysink 
    SELECT 
        TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS ts,
        devid,
        SUM(distance) AS distance 
    FROM (
        SELECT 
            rowtime, 
            devid, 
            st_distance(lat_max, lng_max, lat_min, lng_min) AS distance
        FROM (
            SELECT 
                rowtime, 
                devid, 
                MAX(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_max, 
                MIN(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_min, 
                MAX(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_max, 
                MIN(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_min 
            FROM mysource
            )
        )
    GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), devid

    每个车辆不定时上报当前的经度和纬度,以及时间戳和车辆id。为了统计每辆车每分钟行驶里程:

    1. 先通过最里面的SELECT语句,用ROWS OVER窗口,算出每辆车前后2次上报经纬度分别最大值和最小值, 即lat_max, lat_min, lng_max, lng_min。
    2. 再通过第二个SELECT语句,计算当前点距离上一个点行驶的里程,记为distance。
    3. 最后,通过最外层的SELECT语句,通过1分钟固定窗口,对上一层计算的里程值distance进行求和。

    ROWS OVER窗口监控车辆超速

    监控多辆汽车,如果有车速度超过120,立即发送告警,报告哪辆车超速,同时报告时间、当前车速。如果车速从120以上降低到120以下,立即解除告警,同时报告时间、当前车速。

    INSERT INTO mysink 
    SELECT 
        devid, 
        CASE pre_speed > speed WHEN true THEN 'Recovered' ELSE 'Warning' END  AS message,
        rowtime AS ts,
        speed
    FROM (
            SELECT 
                CAST(first_value_str(speed) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) AS pre_speed,
                speed,
                devid,
                rowtime 
            FROM mysource
        ) 
    WHERE (pre_speed < 120 AND speed > 120) 
        or (pre_speed > 120 AND speed < 120)

    利用first_value_str函数提取前面一次的车速,判断当前车速与前次车速是否分别大于和小于120。如果前后2次车速都小于120,或者都大于120,则不输出任何消息。

    一篇
    任务与数据源和目的地的关联
    一篇
    SQL手册