物联网设备实时监控预警
所有文档

          百度流式计算 BSC

          物联网设备实时监控预警

          业务场景

          监控、预警工厂设备的用电情况。

          业务描述

          用户拥有大量的大功率设备,如果没有在下班之前及时关闭,会造成用电浪费,甚至引起重大安全事故。每个设备上的传感器定时(5~30秒不等)将设备当前的情况推送到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中作为 source,在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于设备关键信息的提取,并实时将处理结果推送到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中,方便下游 规则引擎(Rule Engine) 和 时序时空数据库(TSDB)消费。用户可以基于 智能小程序 开发小程序或第三方平台调用 TSDB 的数据 API,并完成数据展示、历史数据分析、故障预警等功能。可以有效发现安全隐患、及时更换老旧设备、发现异常用电情况,为工厂运转节省成本、提升安全系数。

          处理流程

          用户设备 → IoT Hub → BSC → IoT Hub → Rule Engine → TSDB → 小程序

          案例实现

          一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。

          定义 MQTT Source 表

          SET job.stream.timeType = 'PROCESSTIME'; -- 设置 PROCTIME
          CREATE TABLE source_mqtt_table (
                  `modbus` ROW(
                      `request` ROW(
                          `startAddr` BIGINT,
                          `length` BIGINT
                      ),
                      `response` STRING,
                      `parsedResponse` ARRAY < ROW(
                          `desc` STRING,
                          `type` STRING,
                          `unit` STRING,
                          `value` STRING,
                          `errno` BIGINT
                      ) >
                  )
                  `metrics` ROW(
                      `Settingtime_m` BIGINT,
                      `Building` BIGINT,
                      `Floor` BIGINT,
                      `Company` BIGINT,
                      `Equipment` BIGINT,
                      `C_Temperature` DOUBLE,
                      `S_Temperature` BIGINT,
                      `Cabinet` BIGINT,
                      `Runningtime_m` BIGINT,
                      `Runningtime_h` BIGINT,
                      `Settingtime_h` BIGINT
                  )
              ) WITH (
                  'connector.type' = 'MQTT',
                  'format.encode' = 'JSON',
                  'connector.url' = 'tcp://xxxxxxxxxx.mqtt.iot.gz.baidubce.com:1883',
                  'connector.topic' = 'Device1',
                  'connector.username' = 'xxxxxxxxxx/yyyyyy',
                  'connector.password' = 'xxxxxxxx',
                  'connector.semantic' = 'at_least_once'
              );

          定义 MQTT Sink 表

          CREATE TABLE sink_mqtt_table (
                  `field` STRING,
                  `timestamp` BIGINT,
                  `value` DOUBLE,
                  `Company` BIGINT,
                  `Building` BIGINT,
                  `Floor` BIGINT,
                  `Cabinet` BIGINT,
                  `Equipment` BIGINT
              ) WITH (
                  'connector.type' = 'MQTT',
                  'format.encode' = 'JSON',
                  'connector.url' = 'tcp://xxxxxxx.mqtt.iot.gz.baidubce.com:1883',
                  'connector.topic' = 'Device1_Unnested',
                  'connector.username' = 'xxxxxx/yyyyy',
                  'connector.password' = 'xxxxxxxx'
              );

          编写数据提取DML语句

          解析 source 表中的复杂嵌套 json, 提取出设备的关键信息,如位置编号、运行状态,并使用 PROCTIME 来为输出结果添加一条记录

          INSERT INTO
              sink_mqtt_table
          SELECT
              `desc` AS field,
              TO_BIGINT(CURRENT_TIMESTAMP) AS `timestamp`,
              CAST(`value` AS DOUBLE) AS `value`,
              Company,
              Building,
              `Floor`,
              Cabinet,
              Equipment
          FROM
              source_mqtt_table,
              UNNEST(sink_mqtt_table.parsedResponse) AS A(`desc`, `type`, `unit`, `value`, `errno`)
          WHERE
              `desc` NOT IN (
                  'Company',
                  'Building',
                  'Floor',
                  'Cabinet',
                  'Equipment'
              )
          上一篇
          CDN 接口日志聚合统计
          下一篇
          API 日志调用统计