所有文档

          百度流式计算 BSC

          物设备报警情况实时统计

          业务场景

          统计每个设备每分钟报警次数。

          业务描述

          用户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上的传感器大概每5秒采集并上传数据到IoT Hub。

          sensorId time status
          传感器ID 发送时间 是否报警,status值为1代表报警

          传感器分布在多个设备、多个厂区,用户在RDS还记录如下传感器、设备、厂区维表信息,如下:

          sensorId sensorType deviceId useTime
          传感器ID 传感器类型 设备ID 使用寿命

          计算逻辑如下:

          统计每个设备每分钟发生报警的次数,并将统计结果通过输出到下游的RDS,最终展示在可视化报表中。

          案例实现

          定义MQTT source表

          CREATE TABLE source_mqtt_table(
          sensorId STRING,
          time STRING,
          status INTEGER
          ) WITH(
          type = 'MQTT',
          brokerUrl = 'tcp://duig1nr.mqtt.iot.bj.baidubce.com:1883', --必填
          topic = 'sensor', --必填
          username = 'iotdemo', --必填
          password = 'iotdemo', --必填
          encode = 'JSON',
          connectionTimeout = '30', --非必填,访问超时设置,单位:s
          keepAliveInterval = '60', --非必填,规定时间段内不活动时连接会被断开,单位:s
          maxBatchMessageNum = 'Int.Max', --非必填,每个batch最大数据条数
          maxBatchMessageSize = 'Int.Max' --非必填,每个batch最大消息字节数
          );

          定义RDS source表

          CREATE TABLE source_rds_table(
          sensorId STRING,
          sensorType STRING,
          deviceId STRING,
          useTime INTEGER
          ) WITH(
          type = 'RDS',
          user = 'rdsdemo', --必填,数据库用户名
          password = 'rdsdemo', --必填,数据库访问密码
          url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds_test?useUnicode=true&characterEncoding=UTF8', --必填,jdbc访问RDS的url
          dbTable = 'test' --必填,数据表名称
          );

          定义RDS sink表

          CREATE TABLE sink_rds_table(
          deviceId STRING,
          time TIMESTAMP,
          nums INTEGER
          ) WITH(
          type = 'RDS',
          user = 'iotdemo', --必填,数据库用户名
          password = 'iotdemo11', --必填,数据库访问密码
          url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds?useUnicode=true&characterEncoding=UTF8', --必填,jdbc访问RDS的url
          dbTable = 'iotdemo' --必填,数据表名称
          );

          编写数据统计DML语句

          统计这一分钟内每个设备的报警次数。由于使用的是滚动窗口,也就意味着数据将在每分钟结束时候产出一份并写入到RDS。

          INSERT INTO
          sink_rds_table outputmode append
          SELECT
          source_rds_table.deviceId,
          CAST(FROM_UNIXTIME(CAST(source_mqtt_table.time AS LONG)) AS TIMESTAMP) AS time,
          count(*) AS nums
          FROM
          source_mqtt_table INNER JOIN source_rds_table ON source_mqtt_table.sensorId = source_rds_table.sensorId
          WHERE
          source_mqtt_table.status = 1
          GROUP BY
          window(time, "1 minute"),
          deviceId
          上一篇
          操作指南
          下一篇
          CDN 日志提取中转(ETL)