数据源和目的地
Creek支持丰富的数据源和目的地,方便与多种上下游对接。目前支持的connector包括如下几种,并且还在持续的增加中。
MQTT数据源和目的地
MQTT数据源
MQTT数据源支持以MQTT协议订阅指定主题的消息,作为流式作业的输入。在source中的type值为:MQTT
该数据源通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
host | MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883 |
topic | MQTT主题,支持通配符,例如:creek/input/# |
user | [可选]MQTT连接用户名,如: myusername |
pass | [可选]MQTT连接密码,如: mypass123 |
clientId | [可选]MQTT连接的clientid,默认creekmqtt_src<系统时间(毫秒)> |
qos | [可选]MQTT订阅的服务质量,可能取值:0,1,2, 默认为1 |
MQTT目的地
计算结果发送到指定的MQTT主题。在sink中的type值为:MQTT
该数据目的地通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
host | MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883 |
topic | MQTT主题,不支持通配符,例如:creek/output |
user | [可选]MQTT连接用户名,如: myusername |
pass | [可选]MQTT连接密码,如: mypass123 |
clientId | [可选]MQTT连接的clientid,默认creekmqtt_sink<系统时间(毫秒)> |
qos | [可选]MQTT发布的服务质量,可能取值:0,1,2,默认为1 |
retained | [可选]MQTT发布的消息是否设置retained标志,默认为false |
KAFKA数据源和目的地
KAFKA数据源
从一个或者多个Kafka主题订阅消息,作为流式作业的输入。在source中的type值为:KAFKA
该数据源通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
broker | kafka broker的服务器地址列表,例如:127.0.0.1:9092 |
topic | 订阅的一个或者多个主题,多种主题通过逗号分隔。例如:topic1,topic2 |
version | Kafka Broker的版本号,字符串类型。例如:1.0.2 |
auto.offset.reset | [可选]当group没有提交过offset,系统从主题的当前位置,还是最早的位置开始消费。可选earliest和largest,默认为largest |
示例
"attr": {
"version": "1.0.2",
"topic": "topic1,topic2",
"auto.offset.reset": "earliest",
"broker": "127.0.0.1:9092"
}
KAFKA目的地
计算结果输出到指定的Kafka主题。在sink中的type值为:KAFKA
该数据目的地通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
broker | kafka broker的服务器地址列表,例如:127.0.0.1:9092 |
topic | 目的kafka主题,例如:topic1,数据均衡的发送到各个partition |
FILE数据源和目的地
FILE数据源
从一个文本文件中,逐行读入数据,作为流式作业的输入,一行对应一条记录,如果tail为false,读取到文件末尾,则退出作业。在source中的type值为:FILE。
该数据源通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
input | 输入文件的路径 |
tail | [可选]是否跟踪文件的变化,如果设置为true,则会持续监控文件的更新,包括文件的rotate,永不结束。默认为false |
FILE目的地
计算结果输出到指定的文件,每条记录以换行符分隔,文件不存在会自动创建,总是以APPEND方式写,在故障恢复期间,可能会截断文件,以实现exactly once的处理。在sink中的type值为:FILE。
该数据目的地通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
output | 目标文件的路径 |
标准输入和输出
标准输入STDIN
为了方便调试和测试,可以使用STDIN的输入类型,作业启动后从命令行读入数据,一行一条,当遇到EOF(linux和mac:ctrl+d, windows: ctrl+z)时,作业有序退出。在source中的type值为:STDIN
该数据源类型没有其他参数。
标准输出STDOUT
为了方便调试和测试,可以使用STDOUT的输出类型,作业的计算结果打印到标准屏幕,每条以换行符分隔。在sink中的type值为:STDOUT
该数据目的地没有其他参数。
固定输入COLLECTION
为了方便调试和测试,当需要给定固定的输入数据集时,可以使用COLLECTION类型的source。作业运行起来从source的input数组中读取数据,处理完成后,退出作业。在source中的type值为:COLLECTION
该数据源通过attr对象指定相关的参数:
属性 | 说明 |
---|---|
input | 应该为字符串数组 |
示例
"attr": {
"input": [
"2019-11-25 11:50:13.174,device01,23.5",
"2019-11-25 11:51:14.174,device02,24.9"
]
}