规则引擎Rule Engine

    格式

    流式计算支持3种消息格式,JSON,CSV和一般性文本(如web日志)。

    JSON

    在定义数据源的时候,需要指定消息中key的名字,和它的数据类型。对于不参与计算的key,不需要指定。

    对于如下格式的消息:

    {
        "ts": 1523427180,
        "deviceid": "device01",
        "temperature": 35.5
    }

    其对应的字段名称和类型分别如下:

    字段名 类型
    ts SQL_TIMESTAMP
    deviceid STRING
    temperature DOUBLE

    如果实际消息中,指定的字段不存在,则计算结果为null。但时间戳字段不能为null。

    嵌套JSON(输入)

    对于带嵌套的JSON格式,需要用点(.)连接该字段完整的路径,如data.deviceid。例如,对于如下格式消息:

    {
        "ts": 1523427180,
        "data": {
            "deviceid": "device01",
            "temperature": 35.5
        }
    }

    其对应的字段名称和类型分别如下:

    字段名 类型
    ts SQL_TIMESTAMP
    data.deviceid STRING
    data.temperature DOUBLE

    在SQL中引用字段,用最后一部分路径,即deviceid, temperature等。例如:

    INSERT INTO mysink 
    SELECT 
        deviceid, 
        MAX(temperature) AS max_temp
    FROM mysource 
    GROUP BY 
        Tumble(rowtime, INTERVAL '5' SECOND), 
        deviceid

    如果定义的嵌套的字段,最后一部分路径名称存在冲突,可以在字段名称中通过AS关键词进行重命名。例如,对于如下格式消息:

    {
        "ts": 1523427180,
        "data": {
            "deviceid": "device01"
        },
        "data2": {
            "deviceid": "device02"
        }
    }

    data和data2下面都包含名为deviceid的字段,为了在SQL中明确地引用某个deviceid,可以如下定义字段:

    字段名 类型
    ts SQL_TIMESTAMP
    data.deviceid AS device1 STRING
    data2.deviceid AS device2 STRING

    这样,SELECT device1是引用data.deviceid;SELECT device2是引用data2.deviceid。

    JSON数组

    如果需要处理的数据在数组中,则需要把数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:

    {
        "ts": 1523427180,
        "data": [{
            "deviceid": "device01",
            "temperature": 35.5
        },
        {
            "deviceid": "device02",
            "temperature": 38.0
        }]
    }

    其对应的字段名称和类型分别如下:

    字段名 类型
    ts SQL_TIMESTAMP
    data ARRAY
    data.deviceid STRING
    data.temperature DOUBLE

    在SQL中处理数组中的数据,需要通过UNNEST语句将数组展开,如:

    INSERT INTO mysink
    SELECT 
        Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts,
        deviceid,
        AVG(temperature) AS avg_temp
    FROM 
        mysource, 
        UNNEST(mysource.data) AS A (deviceid, temperature) 
    GROUP BY 
        Tumble(rowtime, INTERVAL '1' MINUTE), 
        deviceid

    JSON数组嵌套

    如果需要处理的数据在数组中,并且该数组嵌套在另外一个数组之内,则需要把两级数组字段本身定义成ARRAY类型,然后依次定义数组内部各个字段类型。举个例子,假如消息格式如下:

    {
        "ts": 1523427180,
        "data": [{
            "info": [{
                "devcieid": "device01",
                "temperature": 35.5
            }]
        }]
    }

    其对应的字段名称和类型分别如下:

    字段名 类型
    ts SQL_TIMESTAMP
    data ARRAY
    data.info ARRAY
    data.info.deviceid STRING
    data.info.temperature DOUBLE

    在SQL中处理数组中的数据,需要通过两次UNNEST语句将数组展开,如:

    INSERT INTO mysink
    SELECT 
        Tumble_Start(rowtime, INTERVAL '1' MINUTE) AS ts, 
        deviceid, 
        AVG(temperature) AS avg_temp
    FROM 
        mysource,
        UNNEST(mysource.data) AS A (arr1),
        UNNEST(A.arr1) AS B(deviceid, temperature)
    GROUP BY
        Tumble(rowtime, INTERVAL '1' MINUTE),
        deviceid

    嵌套JSON(输出)

    如需写到数据目的地中的JSON数据包含嵌套结构,可以通过如下AS语法实现:

    SELECT val AS `data.val`

    上述语句输出的JSON格式如下:

    {
    	"data": {
    		"val":1.2
    	}
    }

    注意,AS后面的JSON路径需要用重音符(`)引用起来。下面的例子,演示了如何通过输出嵌套的JSON,以达到写TSDB多域的目的:

    INSERT INTO yyj_tsdb_sink
    SELECT 
        ts,
        deviceid,
        lng,
        lat,
        'location' AS metric,
        'ts' AS `_TSDB_META_v3.ts`,
        'metric' AS `_TSDB_META_v3.metric`,
        'deviceid' AS `_TSDB_META_v3.tags.tag1`,
        'lng' AS `_TSDB_META_v3.fields.field1`,
        'lat' AS `_TSDB_META_v3.fields.field2`
     FROM yyj_mqtt_src

    CSV

    CSV是以英文逗号(或者自定义分隔符)分隔的数据,字段定义的顺序为数据的顺序。例如消息为:

    1523427180,device01,35.5

    其合理的字段的定义如下:

    字段名 类型
    ts SQL_TIMESTAMP
    deviceid STRING
    temperature DOUBLE

    TXT_BY_GROK(仅适用数据源)

    通过一个Grok Filter的pattern来将用户的原始文本消息解析成格式化后的数据,然后再参与计算。例如通过百度日志服务(BLS)将Web日志采集到Kafka后,可以通过该方式解析然后计算。

    该格式需要另外还指定一个pattern,例如:%{COMMONAPACHELOG_DATATYPED}。系统已经默认支持了常见的pattern,具体的列表,参考patterns.txt

    当填写完pattern后,系统自动解析并且提取每个字段。对于不需要参与计算的字段,可以删除掉,以产生不必要的计算开销。

    上一篇
    数据源和目的地
    下一篇
    字段类型