格式
所有文档

          规则引擎 Rule Engine

          格式

          流式计算支持3种消息格式,JSON,CSV和一般性文本(如web日志)。

          JSON

          在定义数据源的时候,需要指定消息中key的名字,和它的数据类型。对于不参与计算的key,不需要指定。

          对于如下格式的消息:

          {
              "ts": 1523427180,
              "deviceid": "device01",
              "temperature": 35.5
          }

          其对应的字段名称和类型分别如下:

          字段名 类型
          ts SQL_TIMESTAMP
          deviceid STRING
          temperature DOUBLE

          如果实际消息中,指定的字段不存在,则计算结果为null。但时间戳字段不能为null。

          嵌套JSON(输入)

          对于带嵌套的JSON格式,需要用点(.)连接该字段完整的路径,如data.deviceid。例如,对于如下格式消息:

          {
              "ts": 1523427180,
              "data": {
                  "deviceid": "device01",
                  "temperature": 35.5
              }
          }

          其对应的字段名称和类型分别如下:

          字段名 类型
          ts SQL_TIMESTAMP
          data.deviceid STRING
          data.temperature DOUBLE

          在SQL中引用字段,用最后一部分路径,即deviceid, temperature等。例如:

          INSERT INTO mysink 
          SELECT 
              deviceid, 
              MAX(temperature) AS max_temp
          FROM mysource 
          GROUP BY 
              Tumble(rowtime, INTERVAL '5' SECOND), 
              deviceid

          如果定义的嵌套的字段,最后一部分路径名称存在冲突,可以在字段名称中通过AS关键词进行重命名。例如,对于如下格式消息:

          {
              "ts": 1523427180,
              "data": {
                  "deviceid": "device01"
              },
              "data2": {
                  "deviceid": "device02"
              }
          }

          data和data2下面都包含名为deviceid的字段,为了在SQL中明确地引用某个deviceid,可以如下定义字段:

          字段名 类型
          ts SQL_TIMESTAMP
          data.deviceid AS device1 STRING
          data2.deviceid AS device2 STRING

          这样,SELECT device1是引用data.deviceid;SELECT device2是引用data2.deviceid。

          JSON数组

          如果需要处理的数据在数组中,则需要把数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:

          {
              "ts": 1523427180,
              "data": [{
                  "deviceid": "device01",
                  "temperature": 35.5
              },
              {
                  "deviceid": "device02",
                  "temperature": 38.0
              }]
          }

          其对应的字段名称和类型分别如下:

          字段名 类型
          ts SQL_TIMESTAMP
          data ARRAY
          data.deviceid STRING
          data.temperature DOUBLE

          在SQL中处理数组中的数据,需要通过UNNEST语句将数组展开,如:

          INSERT INTO mysink
          SELECT 
              Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts,
              deviceid,
              AVG(temperature) AS avg_temp
          FROM 
              mysource, 
              UNNEST(mysource.data) AS A (deviceid, temperature) 
          GROUP BY 
              Tumble(rowtime, INTERVAL '1' MINUTE), 
              deviceid

          JSON数组嵌套

          如果需要处理的数据在数组中,并且该数组嵌套在另外一个数组之内,则需要把两级数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:

          {
              "ts": 1523427180,
              "data": [{
                  "info": [{
                      "devcieid": "device01",
                      "temperature": 35.5
                  }]
              }]
          }

          其对应的字段名称和类型分别如下:

          字段名 类型
          ts SQL_TIMESTAMP
          data ARRAY
          data.info ARRAY
          data.info.deviceid STRING
          data.info.temperature DOUBLE

          在SQL中处理数组中的数据,需要通过两次UNNEST语句将数组展开,如:

          INSERT INTO mysink
          SELECT 
              Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts, 
              deviceid, 
              AVG(temperature) AS avg_temp
          FROM 
              mysource,
              UNNEST(mysource.data) AS A (arr1),
              UNNEST(A.arr1) AS B(deviceid, temperature)
          GROUP BY
              Tumble(rowtime, INTERVAL '1' MINUTE),
              deviceid

          嵌套JSON(输出)

          如需写到数据目的地中的JSON数据包含嵌套结构,可以通过如下AS语法实现:

          SELECT val AS `data.val`

          上述语句输出的JSON格式如下:

          {
          	"data": {
          		"val":1.2
          	}
          }

          注意,AS后面的JSON路径需要用重音符(`)引用起来。下面的例子,演示了如何通过输出嵌套的JSON,以达到写TSDB多域的目的:

          INSERT INTO yyj_tsdb_sink
          SELECT 
              ts,
              deviceid,
              lng,
              lat,
              'location' AS metric,
              'ts' AS `_TSDB_META_v3.ts`,
              'metric' AS `_TSDB_META_v3.metric`,
              'deviceid' AS `_TSDB_META_v3.tags.tag1`,
              'lng' AS `_TSDB_META_v3.fields.field1`,
              'lat' AS `_TSDB_META_v3.fields.field2`
           FROM yyj_mqtt_src

          CSV

          CSV是以英文逗号(或者自定义分隔符)分隔的数据,字段定义的顺序为数据的顺序。例如消息为:

          1523427180,device01,35.5

          其合理的字段的定义如下:

          字段名 类型
          ts SQL_TIMESTAMP
          deviceid STRING
          temperature DOUBLE

          TXT_BY_GROK(仅适用数据源)

          通过一个Grok Filter的pattern来将用户的原始文本消息解析成格式化后的数据,然后再参与计算。例如通过百度日志服务(BLS)将Web日志采集到Kafka后,可以通过该方式解析然后计算。

          该格式需要另外还指定一个pattern,例如:%{COMMONAPACHELOG_DATATYPED}。系统已经默认支持了常见的pattern,具体的列表,参考patterns.txt

          当填写完pattern后,系统自动解析并且提取每个字段。对于不需要参与计算的字段,可以删除掉,以产生不必要的计算开销。

          上一篇
          数据源和目的地
          下一篇
          字段类型