开发作业
所有文档

          百度流式计算 BSC

          开发作业

          创建作业

          1. 选择“百度流式计算BSC>作业管理>作业开发”,进入到作业开发页面。

            • 百度智能云目前开放多区域支持,如果您需要在多区域新增作业,请参考区域选择说明
            • 在不同区域创建的作业相互独立。
          2. 点击“新增作业”,显示“新增作业”弹出框,在弹出框中填写作业的基本信息。

            说明:作业类型:目前仅支持“SQL作业”类型,如需了解SQL语法,请参考SQL使用手册

          3. 点击“确定”进入到编辑作业页面。

          编辑作业

          为了方便用户快速了解流式作业包括哪些内容,为用户提供示例作业,示例作业的完整SQL语句如下(用户可以直接粘贴到编辑器中,进行作业调试):

          CREATE table source_kafka(
              username STRING,
              cost_type STRING,
              cost Float
          ) with(
              type = 'BKAFKA',
              topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source',
              kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
              sslFilePath = 'bsc_test_ce.zip',
              encode = 'CSV'
          );
          CREATE TABLE source_mysql(
            username1 STRING,
            cost_type1 STRING  
          )WITH(
              type = 'RDS',
              user = 'zhangsan',
              password = 'xxxx',
              accountId = 'xxxx',
              url = 'xxxx',
              dbTable = 'xxx'
          );
          create table sink_kafka(
              username String,
              cost_type String,
              cost Float
          ) with(
              type = 'BKAFKA',
              topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink',
              kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
              sslFilePath = 'bsc_test_ce.zip',
              encode = 'CSV'
          );
          insert into
              sink_kafka
          select
              username,
              cost_type,
              cost
          from
              source_mysql right join source_kafka on source_mysql.username1 = source_kafka.username
          WHERE
                cost > 1500;

          接下来对示例作业的各个模块进行详细介绍。

          添加kafka作为输入端

          1. 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考创建主题。
          2. 编写kafka作为输入端的DDL语句。

            说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

            CREATE table source_kafka( username STRING, cost_type STRING, cost Float ) with( type = 'BKAFKA', topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source', kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091', sslFilePath = 'bsc_test_ce.zip', encode = 'CSV' );

          With参数的含义:

          参数名称 必填 描述
          type 输入端接入的数据资源的类型,BKAFKA代表百度消息服务
          topic BKAFKA中用户自己创建的Topic
          kafka.bootstrap.servers BKAFKA服务器地址和端口,地址和端口之间用“:”分隔。
          北京区域填写:kafka.bj.baidubce.com:9091
          广州区域填写:kafka.gz.baidubce.com:9092
          sslFilePath 该topic的证书文件,格式为zip,并且需要将通过“高级设置”将证书文件上传到服务中
          encode Kafka中数据的编码格式,支持json和csv格式的消息

          添加RDS作为输入端

          1. 如果没有可用的RDS数据表,需要先使用RDS添加数据表。创建数据表的步骤请参考使用流程。
          2. 编写RDS作为输入端的DDL语句。

            说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

            CREATE TABLE source_mysql( username1 STRING, cost_type1 STRING
            )WITH( type = 'RDS', user = 'zhangsan', password = 'xxxx', accountId = 'xxxx', url = 'xxxx', dbTable = 'xxx' );

          参数名称 必填 描述
          type 输入端接入的数据资源的类型,此处填写“RDS”
          user RDS的用户名
          password RDS用户名对应的密码
          accountId RDS用户的用户ID
          url 通过jdbc访问rds的url,例如
          jdbc:mysql://mysql56.rdsmiusj9oseag6.rds.bj.baidubce.com:3306/bsc_test?useUnicode=true&characterEncoding=UTF8
          dbTable 数据表名称

          添加Kafka作为输出端

          1. 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考 创建主题。
          2. 编写kafka作为输出端的DDL语句。

            说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

            create table sink_kafka( username String, cost_type String, cost Float ) with( type = 'BKAFKA', topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink', kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091', sslFilePath = 'bsc_test_ce.zip', encode = 'CSV' );

          编写业务处理逻辑的SQL语句

          insert into
              sink_kafka
          select
              username,
              cost_type,
              cost
          from
              source_mysql right join source_kafka on source_mysql.username1 = source_kafka.username
          WHERE
                cost > 1500;

          调试作业

          1. 点击右上角的“调试作业”,弹出上传输入端测试数据的页面。

            • 上传csv文件:csv文件中的字段需要跟输入端数据表中的字段(包括字段顺序)保持一致。
            • 手动录入:用户根据输入框中提示的字段类型输入相应类型的测试数据,测试数据需要去掉头部的字段名称
          2. 选择“source_mysql”,选择“手动录入”,将下面的测试数据粘贴到输入框。 室友 1,租金 室友 2,租金 室友 3,租金 室友 4,租金 室友 1,有线电视 室友 2,有线电视 室友 3,有线电视 室友 4,有线电视
          3. 选择“source_kafka”,选择“手动录入”,将下面的测试数据粘贴到输入框。

            室友 1,租金,3600.00 
            室友 2,租金,3500.00 
            室友 3,租金,2000.00 
            室友 4,租金,2000.00 
            室友 1,有线电视,250.00 
            室友 2,有线电视,250.00 
            室友 3,有线电视,250.00 
            室友 4,有线电视,250.00 
            室友 3,日用品,1500.00
          4. 点击“开始调试”,开始运行作业。
          5. 查看作业运行过程中产生的调试日志和调试结果。

          发布作业

          点击“发布”,设置CU数据,然后发布作业。其中,CU是作业运行所需资源的基本单位,1CU包括1核CPU和4G内存。

          运维作业

          启动作业

          1. 选择“百度流式计算BSC>作业管理>作业运维”,进入到作业运维页面。
          2. 点击列表中操作这一列的“启动”,启动当前作业开始运行。

          如需了解作业运行过程中的监控数据和运行日志相关内容,请参照作业运维。

          上一篇
          开通服务
          下一篇
          操作指南