规则引擎Rule Engine

    数据源和目的地

    Creek支持丰富的数据源和目的地,方便与多种上下游对接。目前支持的connector包括如下几种,并且还在持续的增加中。

    MQTT数据源和目的地

    MQTT数据源

    MQTT数据源支持以MQTT协议订阅指定主题的消息,作为流式作业的输入。在source中的type值为:MQTT

    该数据源通过attr对象指定相关的参数:

    属性 说明
    host MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883
    topic MQTT主题,支持通配符,例如:creek/input/#
    user [可选]MQTT连接用户名,如: myusername
    pass [可选]MQTT连接密码,如: mypass123
    clientId [可选]MQTT连接的clientid,默认creekmqtt_src<系统时间(毫秒)>
    qos [可选]MQTT订阅的服务质量,可能取值:0,1,2, 默认为1

    MQTT目的地

    计算结果发送到指定的MQTT主题。在sink中的type值为:MQTT

    该数据目的地通过attr对象指定相关的参数:

    属性 说明
    host MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883
    topic MQTT主题,不支持通配符,例如:creek/output
    user [可选]MQTT连接用户名,如: myusername
    pass [可选]MQTT连接密码,如: mypass123
    clientId [可选]MQTT连接的clientid,默认creekmqtt_sink<系统时间(毫秒)>
    qos [可选]MQTT发布的服务质量,可能取值:0,1,2,默认为1
    retained [可选]MQTT发布的消息是否设置retained标志,默认为false

    KAFKA数据源和目的地

    KAFKA数据源

    从一个或者多个Kafka主题订阅消息,作为流式作业的输入。在source中的type值为:KAFKA

    该数据源通过attr对象指定相关的参数:

    属性 说明
    broker kafka broker的服务器地址列表,例如:127.0.0.1:9092
    topic 订阅的一个或者多个主题,多种主题通过逗号分隔。例如:topic1,topic2
    version Kafka Broker的版本号,字符串类型。例如:1.0.2
    auto.offset.reset [可选]当group没有提交过offset,系统从主题的当前位置,还是最早的位置开始消费。可选earliest和largest,默认为largest

    示例

    "attr": {
    	"version": "1.0.2",
    	"topic": "topic1,topic2",
    	"auto.offset.reset": "earliest",
    	"broker": "127.0.0.1:9092"
    }

    KAFKA目的地

    计算结果输出到指定的Kafka主题。在sink中的type值为:KAFKA

    该数据目的地通过attr对象指定相关的参数:

    属性 说明
    broker kafka broker的服务器地址列表,例如:127.0.0.1:9092
    topic 目的kafka主题,例如:topic1,数据均衡的发送到各个partition

    FILE数据源和目的地

    FILE数据源

    从一个文本文件中,逐行读入数据,作为流式作业的输入,一行对应一条记录,如果tail为false,读取到文件末尾,则退出作业。在source中的type值为:FILE。

    该数据源通过attr对象指定相关的参数:

    属性 说明
    input 输入文件的路径
    tail [可选]是否跟踪文件的变化,如果设置为true,则会持续监控文件的更新,包括文件的rotate,永不结束。默认为false

    FILE目的地

    计算结果输出到指定的文件,每条记录以换行符分隔,文件不存在会自动创建,总是以APPEND方式写,在故障恢复期间,可能会截断文件,以实现exactly once的处理。在sink中的type值为:FILE。

    该数据目的地通过attr对象指定相关的参数:

    属性 说明
    output 目标文件的路径

    标准输入和输出

    标准输入STDIN

    为了方便调试和测试,可以使用STDIN的输入类型,作业启动后从命令行读入数据,一行一条,当遇到EOF(linux和mac:ctrl+d, windows: ctrl+z)时,作业有序退出。在source中的type值为:STDIN

    该数据源类型没有其他参数。

    标准输出STDOUT

    为了方便调试和测试,可以使用STDOUT的输出类型,作业的计算结果打印到标准屏幕,每条以换行符分隔。在sink中的type值为:STDOUT

    该数据目的地没有其他参数。

    固定输入COLLECTION

    为了方便调试和测试,当需要给定固定的输入数据集时,可以使用COLLECTION类型的source。作业运行起来从source的input数组中读取数据,处理完成后,退出作业。在source中的type值为:COLLECTION

    该数据源通过attr对象指定相关的参数:

    属性 说明
    input 应该为字符串数组

    示例

    "attr": {
                "input": [
                    "2019-11-25 11:50:13.174,device01,23.5",
                    "2019-11-25 11:51:14.174,device02,24.9"
                ]
            }
    上一篇
    作业定义
    下一篇
    运行时设置