格式
流式计算支持3种消息格式,JSON,CSV和一般性文本(如web日志)。
JSON
在定义数据源的时候,需要指定消息中key的名字,和它的数据类型。对于不参与计算的key,不需要指定。
对于如下格式的消息:
{
"ts": 1523427180,
"deviceid": "device01",
"temperature": 35.5
}
其对应的字段名称和类型分别如下:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
deviceid | STRING |
temperature | DOUBLE |
如果实际消息中,指定的字段不存在,则计算结果为null。但时间戳字段不能为null。
嵌套JSON(输入)
对于带嵌套的JSON格式,需要用点(.)连接该字段完整的路径,如data.deviceid。例如,对于如下格式消息:
{
"ts": 1523427180,
"data": {
"deviceid": "device01",
"temperature": 35.5
}
}
其对应的字段名称和类型分别如下:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
data.deviceid | STRING |
data.temperature | DOUBLE |
在SQL中引用字段,用最后一部分路径,即deviceid, temperature等。例如:
INSERT INTO mysink
SELECT
deviceid,
MAX(temperature) AS max_temp
FROM mysource
GROUP BY
Tumble(rowtime, INTERVAL '5' SECOND),
deviceid
如果定义的嵌套的字段,最后一部分路径名称存在冲突,可以在字段名称中通过AS关键词进行重命名。例如,对于如下格式消息:
{
"ts": 1523427180,
"data": {
"deviceid": "device01"
},
"data2": {
"deviceid": "device02"
}
}
data和data2下面都包含名为deviceid的字段,为了在SQL中明确地引用某个deviceid,可以如下定义字段:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
data.deviceid AS device1 | STRING |
data2.deviceid AS device2 | STRING |
这样,SELECT device1是引用data.deviceid;SELECT device2是引用data2.deviceid。
JSON数组
如果需要处理的数据在数组中,则需要把数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:
{
"ts": 1523427180,
"data": [{
"deviceid": "device01",
"temperature": 35.5
},
{
"deviceid": "device02",
"temperature": 38.0
}]
}
其对应的字段名称和类型分别如下:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
data | ARRAY |
data.deviceid | STRING |
data.temperature | DOUBLE |
在SQL中处理数组中的数据,需要通过UNNEST语句将数组展开,如:
INSERT INTO mysink
SELECT
Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts,
deviceid,
AVG(temperature) AS avg_temp
FROM
mysource,
UNNEST(mysource.data) AS A (deviceid, temperature)
GROUP BY
Tumble(rowtime, INTERVAL '1' MINUTE),
deviceid
JSON数组嵌套
如果需要处理的数据在数组中,并且该数组嵌套在另外一个数组之内,则需要把两级数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:
{
"ts": 1523427180,
"data": [{
"info": [{
"devcieid": "device01",
"temperature": 35.5
}]
}]
}
其对应的字段名称和类型分别如下:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
data | ARRAY |
data.info | ARRAY |
data.info.deviceid | STRING |
data.info.temperature | DOUBLE |
在SQL中处理数组中的数据,需要通过两次UNNEST语句将数组展开,如:
INSERT INTO mysink
SELECT
Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts,
deviceid,
AVG(temperature) AS avg_temp
FROM
mysource,
UNNEST(mysource.data) AS A (arr1),
UNNEST(A.arr1) AS B(deviceid, temperature)
GROUP BY
Tumble(rowtime, INTERVAL '1' MINUTE),
deviceid
嵌套JSON(输出)
如需写到数据目的地中的JSON数据包含嵌套结构,可以通过如下AS语法实现:
SELECT val AS `data.val`
上述语句输出的JSON格式如下:
{
"data": {
"val":1.2
}
}
注意,AS后面的JSON路径需要用重音符(`)引用起来。下面的例子,演示了如何通过输出嵌套的JSON,以达到写TSDB多域的目的:
INSERT INTO yyj_tsdb_sink
SELECT
ts,
deviceid,
lng,
lat,
'location' AS metric,
'ts' AS `_TSDB_META_v3.ts`,
'metric' AS `_TSDB_META_v3.metric`,
'deviceid' AS `_TSDB_META_v3.tags.tag1`,
'lng' AS `_TSDB_META_v3.fields.field1`,
'lat' AS `_TSDB_META_v3.fields.field2`
FROM yyj_mqtt_src
CSV
CSV是以英文逗号(或者自定义分隔符)分隔的数据,字段定义的顺序为数据的顺序。例如消息为:
1523427180,device01,35.5
其合理的字段的定义如下:
字段名 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
deviceid | STRING |
temperature | DOUBLE |
TXT_BY_GROK(仅适用数据源)
通过一个Grok Filter的pattern来将用户的原始文本消息解析成格式化后的数据,然后再参与计算。例如通过百度日志服务(BLS)将Web日志采集到Kafka后,可以通过该方式解析然后计算。
该格式需要另外还指定一个pattern,例如:%{COMMONAPACHELOG_DATATYPED}。系统已经默认支持了常见的pattern,具体的列表,参考patterns.txt。
当填写完pattern后,系统自动解析并且提取每个字段。对于不需要参与计算的字段,可以删除掉,以产生不必要的计算开销。