规则引擎Rule Engine

    数据源和目的地

    数据源是流式计算的输入,数据目的地是流式计算的输出。目前支持如下几种数据源和目的地。

    物接入(MQTT)

    消息来自于或者发往物接入,需指定实例名和MQTT主题。

    百度消息服务(Kafka)

    消息来自或者发往百度消息服务,需指定Kafka主题。

    时序数据库(TSDB)

    作为目的地

    计算结果直接写入指定的TSDB。时间戳(timestamp)、度量(metric)、值(value)还有标签(tag)需在结果消息里面指定。即输出的消息需包含如下几个字段:

    • timestamp
    • metric
    • value
    • 一个或者多个标签(tag)

    下面是一个目的地为TSDB的SQL举例:

    INSERT INTO my_tsdb_sink 
    SELECT 
        TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS `timestamp`,
        deviceid, 
        AVG(temperature) AS `value`,
        'devicetemperature' AS metric
    FROM mysource 
    GROUP BY 
        TUMBLE(rowtime, INTERVAL '5' MINUTE),
        deviceid

    其中,timestamp, value均为SQL关键字,因此需要用重音符(`)引起来。deviceid为一个tag写进TSDB。

    作为数据源

    选择某个DB的某个度量(metric)的数据作为输入,并且可以处理已经存在的历史数据。系统会自动探测metric下所有的域和tag,并且自动生成各个字段。如果有遗漏的域或者tag,可以在定义数据源时,手动添加相关的字段。

    对于名为x的域,添加的字段名应该为:fields.x,类型为相应的类型;对于名为y的tag,添加的字段名为tags.y,字段类型为STRING。

    举例,假设下面是某个度量下面数据的定义:

    属性名称 类型
    ts SQL_TIMESTAMP
    fields.value DOUBLE
    tags.device STRING

    可以类似如下在SQL中直接引用各个域和tag字段:

    insert into mysink
    select 
    	tumble_start(rowtime, INTERVAL '30' SECOND) as `timestamp`,
    	sum(`value`) as `value`,
    	device
    from mysource
    group by tumble(rowtime,  INTERVAL '30' SECOND),
    	device

    百度对象存储(BOS)(仅作为目的地)

    计算结果写入到指定的BOS(百度对象存储)Bucket中去,每天的数据对应一个文件。具体的文件名为数据目的地的名字加上日期和一个数字下标,文件扩展名与格式一致;例如: yyj_bos_sink_2018-06-25_0.csv

    消息服务(SMS)(仅作为目的地)

    计算结果直接以短信的方式发送,例如检测到异常后立即发送短信告警。该类型目的地,需要选择一个短信接收人,一个短信接收人绑定了发送短信的签名、模板、和一组手机号码。为了保证短信能正确发出,请确保流式任务输出的消息严格符合模板内容中变量的要求,即SQL语句中SELECT出的字段需要与短信模板中的变量一一对应。举个例子,假设短信接收人的短信模板内容如下:

    物联网设备${devid}的温度值过高,最新值为${temp}

    那么SQL语句中必须SELECT两个字段,不能多也不能少,并且分别通过AS语句命名成devidtemp。如下所示:

    INSERT INTO my_sms_sink SELECT deviceid AS devid, AVG(temperature) AS temp FROM ...

    函数计算(CFC)(仅作为目的地)

    将计算结果作为输入,调用相关的函数计算(CFC),以便自定义对计算结果的处理。

    上一篇
    基本概念
    下一篇
    格式