数据源和目的地

数据源是流式计算的输入,数据目的地是流式计算的输出。目前支持如下几种数据源和目的地。

物接入(MQTT)

消息来自于或者发往物接入,需指定实例名和MQTT主题。

百度消息服务(Kafka)

消息来自或者发往百度消息服务,需指定Kafka主题。

时序数据库(TSDB)

作为目的地

计算结果直接写入指定的TSDB。时间戳(timestamp)、度量(metric)、值(value)还有标签(tag)需在结果消息里面指定。即输出的消息需包含如下几个字段:

  • timestamp
  • metric
  • value
  • 一个或者多个标签(tag)

下面是一个目的地为TSDB的SQL举例:

INSERT INTO my_tsdb_sink 
SELECT 
    TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS `timestamp`,
    deviceid, 
    AVG(temperature) AS `value`,
    'devicetemperature' AS metric
FROM mysource 
GROUP BY 
    TUMBLE(rowtime, INTERVAL '5' MINUTE),
    deviceid

其中,timestamp, value均为SQL关键字,因此需要用重音符(`)引起来。deviceid为一个tag写进TSDB。

作为数据源

选择某个DB的某个度量(metric)的数据作为输入,并且可以处理已经存在的历史数据。系统会自动探测metric下所有的域和tag,并且自动生成各个字段。如果有遗漏的域或者tag,可以在定义数据源时,手动添加相关的字段。

对于名为x的域,添加的字段名应该为:fields.x,类型为相应的类型;对于名为y的tag,添加的字段名为tags.y,字段类型为STRING。

举例,假设下面是某个度量下面数据的定义:

属性名称 类型
ts SQL_TIMESTAMP
fields.value DOUBLE
tags.device STRING

可以类似如下在SQL中直接引用各个域和tag字段:

insert into mysink
select 
    tumble_start(rowtime, INTERVAL '30' SECOND) as `timestamp`,
    sum(`value`) as `value`,
    device
from mysource
group by tumble(rowtime,  INTERVAL '30' SECOND),
    device

百度对象存储(BOS)(仅作为目的地)

计算结果写入到指定的BOS(百度对象存储)Bucket中去,每天的数据对应一个文件。具体的文件名为数据目的地的名字加上日期和一个数字下标,文件扩展名与格式一致;例如: yyj_bos_sink_2018-06-25_0.csv

消息服务(SMS)(仅作为目的地)

计算结果直接以短信的方式发送,例如检测到异常后立即发送短信告警。该类型目的地,需要选择一个短信接收人,一个短信接收人绑定了发送短信的签名、模板、和一组手机号码。为了保证短信能正确发出,请确保流式任务输出的消息严格符合模板内容中变量的要求,即SQL语句中SELECT出的字段需要与短信模板中的变量一一对应。举个例子,假设短信接收人的短信模板内容如下:

物联网设备${devid}的温度值过高,最新值为${temp}

那么SQL语句中必须SELECT两个字段,不能多也不能少,并且分别通过AS语句命名成devidtemp。如下所示:

INSERT INTO my_sms_sink SELECT deviceid AS devid, AVG(temperature) AS temp FROM ...

函数计算(CFC)(仅作为目的地)

将计算结果作为输入,调用相关的函数计算(CFC),以便自定义对计算结果的处理。