SQL示例

固定窗口

INSERT INTO mysink 
SELECT 
    TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS ts,
    deviceid,
    AVG(temperature) AS avg_temp
FROM mysource 
GROUP BY 
    TUMBLE(rowtime, INTERVAL '5' MINUTE),
    deviceid

该SQL统计每个设备(deviceid)5分钟(INTERVAL '5' MINUTE)的平均温度(temperature)。

滑动窗口

INSERT INTO mysink 
SELECT 
    HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS ts, 
    deviceid,
    COUNT(deviceid) AS cnt
FROM mysource
GROUP BY
    HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),
    deviceid

该SQL每分钟统计一次每个设备过去5分钟消息的个数。10:09分统计[10:04-10:09)共5分钟的数据;10:10分统计[10:05-10:10)共5分钟的数据。

会话窗口

INSERT INTO mysink
SELECT 
    SESSION_START(rowtime, INTERVAL '30' MINUTE) AS `timestamp`,
    'device_output' AS metric,
    deviceid,
    SUM(val) AS `value`
FROM mysource
GROUP BY 
    SESSION(rowtime, INTERVAL '30' MINUTE),
    deviceid

统计每个设备每个班次(班次间隔为至少30分钟)的总产量(val)。当设备超过30分钟没有上报数据,则认为一个班次的结束。输出的格式可以直接写入TSDB。

两个数据流的Join查询

INSERT INTO mysink
SELECT 
    req.rowtime AS ts_req,
    CAST(ack.rowtime AS TIMESTAMP) AS ts_ack,
    ack.deviceid
FROM 
    mysource1 req, 
    mysource2 ack
WHERE 
    req.deviceid = ack.deviceid 
    AND req.desired = ack.reported 
    AND ack.rowtime BETWEEN req.rowtime AND req.rowtime + INTERVAL '15' SECOND

mysource1为发出设备控制命令的数据流(desired),mysource2为设备操作成功反馈的流(reported)。

将控制流和反馈流join,提取每次设备控制命令从下发后15秒之内收到反馈成功的时间。

ROWS OVER窗口

INSERT INTO mysink 
SELECT 
    rowtime, 
    deviceid,
    MAX(temperature) OVER ( 
    PARTITION BY deviceid 
    ORDER BY rowtime 
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS max_temperature 
FROM mysource

针对每个设备上报的温度,计算该设备前面3次(包含本次)上报的最大温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个ROWS OVER窗口,每个窗口必须一致。

RANGE OVER窗口

INSERT INTO mysink 
SELECT 
    rowtime,  
    deviceid, 
    MIN(temperature) OVER ( 
    PARTITION BY deviceid 
    ORDER BY rowtime 
    RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW) AS min_temperature 
FROM mysource

针对每个设备上报的温度,计算该设备前面5秒钟(包含本次)上报的最小温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个RANGE OVER窗口,每个窗口必须一致。

ROWS OVER窗口计算车辆每分钟行驶里程

INSERT INTO mysink 
SELECT 
    TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS ts,
    devid,
    SUM(distance) AS distance 
FROM (
    SELECT 
        rowtime, 
        devid, 
        st_distance(lat_max, lng_max, lat_min, lng_min) AS distance
    FROM (
        SELECT 
            rowtime, 
            devid, 
            MAX(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_max, 
            MIN(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_min, 
            MAX(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_max, 
            MIN(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_min 
        FROM mysource
        )
    )
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), devid

每个车辆不定时上报当前的经度和纬度,以及时间戳和车辆id。为了统计每辆车每分钟行驶里程:
1) 先通过最里面的SELECT语句,用ROWS OVER窗口,算出每辆车前后2次上报经纬度分别最大值和最小值, 即lat_max, lat_min, lng_max, lng_min。

2) 再通过第二个SELECT语句,计算当前点距离上一个点行驶的里程,记为distance。

3) 最后,通过最外层的SELECT语句,通过1分钟固定窗口,对上一层计算的里程值distance进行求和。

ROWS OVER窗口监控车辆超速

监控多辆汽车,如果有车速度超过120,立即发送告警,报告哪辆车超速,同时报告时间、当前车速。如果车速从120以上降低到120以下,立即解除告警,同时报告时间、当前车速。

INSERT INTO mysink 
SELECT 
    devid, 
    CASE pre_speed > speed WHEN true THEN 'Recovered' ELSE 'Warning' END  AS message,
    rowtime AS ts,
    speed
FROM (
        SELECT 
            CAST(first_value_str(speed) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) AS pre_speed,
            speed,
            devid,
            rowtime 
        FROM mysource
    ) 
WHERE (pre_speed < 120 AND speed > 120) 
    or (pre_speed > 120 AND speed < 120)

利用first_value_str函数提取前面一次的车速,判断当前车速与前次车速是否分别大于和小于120。如果前后2次车速都小于120,或者都大于120,则不输出任何消息。