开发作业
创建作业
-
选择“百度流式计算BSC>作业管理>作业开发”,进入到作业开发页面。
- 百度智能云目前开放多区域支持,如果您需要在多区域新增作业,请参考区域选择说明。
- 在不同区域创建的作业相互独立。
-
点击“新增作业”,显示“新增作业”弹出框,在弹出框中填写作业的基本信息。
说明:作业类型:目前仅支持“SQL作业”类型,如需了解SQL语法,请参考SQL使用手册。
- 点击“确定”进入到编辑作业页面。
编辑作业
为了方便用户快速了解流式作业包括哪些内容,为用户提供示例作业,示例作业的完整SQL语句如下(用户可以直接粘贴到编辑器中,进行作业调试):
CREATE table source_kafka(
username STRING,
cost_type STRING,
cost Float
) with(
type = 'BKAFKA',
topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source',
kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
sslFilePath = 'bsc_test_ce.zip',
encode = 'CSV'
);
CREATE TABLE source_mysql(
username1 STRING,
cost_type1 STRING
)WITH(
type = 'RDS',
user = 'zhangsan',
password = 'xxxx',
accountId = 'xxxx',
url = 'xxxx',
dbTable = 'xxx'
);
create table sink_kafka(
username String,
cost_type String,
cost Float
) with(
type = 'BKAFKA',
topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink',
kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
sslFilePath = 'bsc_test_ce.zip',
encode = 'CSV'
);
insert into
sink_kafka
select
username,
cost_type,
cost
from
source_mysql right join source_kafka on source_mysql.username1 = source_kafka.username
WHERE
cost > 1500;
接下来对示例作业的各个模块进行详细介绍。
添加kafka作为输入端
- 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考创建主题。
-
编写kafka作为输入端的DDL语句。
说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。
CREATE table source_kafka( username STRING, cost_type STRING, cost Float ) with( type = 'BKAFKA', topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source', kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091', sslFilePath = 'bsc_test_ce.zip', encode = 'CSV' );
With参数的含义:
参数名称 | 必填 | 描述 |
---|---|---|
type | 是 | 输入端接入的数据资源的类型,BKAFKA代表百度消息服务 |
topic | 是 | BKAFKA中用户自己创建的Topic |
kafka.bootstrap.servers | 是 | BKAFKA服务器地址和端口,地址和端口之间用“:”分隔。 北京区域填写:kafka.bj.baidubce.com:9091 广州区域填写:kafka.gz.baidubce.com:9092 |
sslFilePath | 是 | 该topic的证书文件,格式为zip,并且需要将通过“高级设置”将证书文件上传到服务中 |
encode | 是 | Kafka中数据的编码格式,支持json和csv格式的消息 |
添加RDS作为输入端
- 如果没有可用的RDS数据表,需要先使用RDS添加数据表。创建数据表的步骤请参考使用流程。
-
编写RDS作为输入端的DDL语句。
说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。
CREATE TABLE source_mysql( username1 STRING, cost_type1 STRING
)WITH( type = 'RDS', user = 'zhangsan', password = 'xxxx', accountId = 'xxxx', url = 'xxxx', dbTable = 'xxx' );
参数名称 | 必填 | 描述 |
---|---|---|
type | 是 | 输入端接入的数据资源的类型,此处填写“RDS” |
user | 是 | RDS的用户名 |
password | 是 | RDS用户名对应的密码 |
accountId | 是 | RDS用户的用户ID |
url | 是 | 通过jdbc访问rds的url,例如 jdbc:mysql://mysql56.rdsmiusj9oseag6.rds.bj.baidubce.com:3306/bsc_test?useUnicode=true&characterEncoding=UTF8 |
dbTable | 是 | 数据表名称 |
添加Kafka作为输出端
- 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考 创建主题。
-
编写kafka作为输出端的DDL语句。
说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。
create table sink_kafka( username String, cost_type String, cost Float ) with( type = 'BKAFKA', topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink', kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091', sslFilePath = 'bsc_test_ce.zip', encode = 'CSV' );
编写业务处理逻辑的SQL语句
insert into
sink_kafka
select
username,
cost_type,
cost
from
source_mysql right join source_kafka on source_mysql.username1 = source_kafka.username
WHERE
cost > 1500;
调试作业
-
点击右上角的“调试作业”,弹出上传输入端测试数据的页面。
- 上传csv文件:csv文件中的字段需要跟输入端数据表中的字段(包括字段顺序)保持一致。
- 手动录入:用户根据输入框中提示的字段类型输入相应类型的测试数据,测试数据需要去掉头部的字段名称
- 选择“source_mysql”,选择“手动录入”,将下面的测试数据粘贴到输入框。 室友 1,租金 室友 2,租金 室友 3,租金 室友 4,租金 室友 1,有线电视 室友 2,有线电视 室友 3,有线电视 室友 4,有线电视
-
选择“source_kafka”,选择“手动录入”,将下面的测试数据粘贴到输入框。
室友 1,租金,3600.00 室友 2,租金,3500.00 室友 3,租金,2000.00 室友 4,租金,2000.00 室友 1,有线电视,250.00 室友 2,有线电视,250.00 室友 3,有线电视,250.00 室友 4,有线电视,250.00 室友 3,日用品,1500.00
- 点击“开始调试”,开始运行作业。
-
查看作业运行过程中产生的调试日志和调试结果。
发布作业
点击“发布”,设置CU数据,然后发布作业。其中,CU是作业运行所需资源的基本单位,1CU包括1核CPU和4G内存。
运维作业
启动作业
- 选择“百度流式计算BSC>作业管理>作业运维”,进入到作业运维页面。
-
点击列表中操作这一列的“启动”,启动当前作业开始运行。
如需了解作业运行过程中的监控数据和运行日志相关内容,请参照作业运维。