SQL示例
更新时间:2019-06-14
固定窗口
INSERT INTO mysink
SELECT
TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS ts,
deviceid,
AVG(temperature) AS avg_temp
FROM mysource
GROUP BY
TUMBLE(rowtime, INTERVAL '5' MINUTE),
deviceid
该SQL统计每个设备(deviceid)5分钟(INTERVAL '5' MINUTE)的平均温度(temperature)。
滑动窗口
INSERT INTO mysink
SELECT
HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS ts,
deviceid,
COUNT(deviceid) AS cnt
FROM mysource
GROUP BY
HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE),
deviceid
该SQL每分钟统计一次每个设备过去5分钟消息的个数。10:09分统计[10:04-10:09)共5分钟的数据;10:10分统计[10:05-10:10)共5分钟的数据。
会话窗口
INSERT INTO mysink
SELECT
SESSION_START(rowtime, INTERVAL '30' MINUTE) AS `timestamp`,
'device_output' AS metric,
deviceid,
SUM(val) AS `value`
FROM mysource
GROUP BY
SESSION(rowtime, INTERVAL '30' MINUTE),
deviceid
统计每个设备每个班次(班次间隔为至少30分钟)的总产量(val)。当设备超过30分钟没有上报数据,则认为一个班次的结束。输出的格式可以直接写入TSDB。
两个数据流的Join查询
INSERT INTO mysink
SELECT
req.rowtime AS ts_req,
CAST(ack.rowtime AS TIMESTAMP) AS ts_ack,
ack.deviceid
FROM
mysource1 req,
mysource2 ack
WHERE
req.deviceid = ack.deviceid
AND req.desired = ack.reported
AND ack.rowtime BETWEEN req.rowtime AND req.rowtime + INTERVAL '15' SECOND
mysource1为发出设备控制命令的数据流(desired),mysource2为设备操作成功反馈的流(reported)。
将控制流和反馈流join,提取每次设备控制命令从下发后15秒之内收到反馈成功的时间。
ROWS OVER窗口
INSERT INTO mysink
SELECT
rowtime,
deviceid,
MAX(temperature) OVER (
PARTITION BY deviceid
ORDER BY rowtime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS max_temperature
FROM mysource
针对每个设备上报的温度,计算该设备前面3次(包含本次)上报的最大温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个ROWS OVER窗口,每个窗口必须一致。
RANGE OVER窗口
INSERT INTO mysink
SELECT
rowtime,
deviceid,
MIN(temperature) OVER (
PARTITION BY deviceid
ORDER BY rowtime
RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW) AS min_temperature
FROM mysource
针对每个设备上报的温度,计算该设备前面5秒钟(包含本次)上报的最小温度值,同时输出时间戳(rowtime)和设备ID(deviceid)。说明,如果有多个RANGE OVER窗口,每个窗口必须一致。
ROWS OVER窗口计算车辆每分钟行驶里程
INSERT INTO mysink
SELECT
TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS ts,
devid,
SUM(distance) AS distance
FROM (
SELECT
rowtime,
devid,
st_distance(lat_max, lng_max, lat_min, lng_min) AS distance
FROM (
SELECT
rowtime,
devid,
MAX(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_max,
MIN(lng) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lng_min,
MAX(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_max,
MIN(lat) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS lat_min
FROM mysource
)
)
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), devid
每个车辆不定时上报当前的经度和纬度,以及时间戳和车辆id。为了统计每辆车每分钟行驶里程:
- 先通过最里面的SELECT语句,用ROWS OVER窗口,算出每辆车前后2次上报经纬度分别最大值和最小值, 即lat_max, lat_min, lng_max, lng_min。
- 再通过第二个SELECT语句,计算当前点距离上一个点行驶的里程,记为distance。
- 最后,通过最外层的SELECT语句,通过1分钟固定窗口,对上一层计算的里程值distance进行求和。
ROWS OVER窗口监控车辆超速
监控多辆汽车,如果有车速度超过120,立即发送告警,报告哪辆车超速,同时报告时间、当前车速。如果车速从120以上降低到120以下,立即解除告警,同时报告时间、当前车速。
INSERT INTO mysink
SELECT
devid,
CASE pre_speed > speed WHEN true THEN 'Recovered' ELSE 'Warning' END AS message,
rowtime AS ts,
speed
FROM (
SELECT
CAST(first_value_str(speed) OVER (PARTITION BY devid ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) AS pre_speed,
speed,
devid,
rowtime
FROM mysource
)
WHERE (pre_speed < 120 AND speed > 120)
or (pre_speed > 120 AND speed < 120)
利用first_value_str函数提取前面一次的车速,判断当前车速与前次车速是否分别大于和小于120。如果前后2次车速都小于120,或者都大于120,则不输出任何消息。