所有文档

          Elasticsearch BES

          使用BSC将Kafka中的数据导入Es

          介绍

          本文主要介绍通过 BSC【百度流式计算服务】将数据从Kafka中导入到Es中。

          创建集群

          在将数据导入Es之前需要再百度云上创建Es集群,假定创建的集群信息如下: image4-1.png

          这里主要记录以下信息:

          • 集群ID: 296245916518715392
          • 创建集群时的密码: bbs_2016

          创建Kafka Topic

          登录百度云管理控制台,进入kafka产品界面,创建topic,并向topic中灌入数据。示例中我们灌入的数据如下,包括两个json字段,样例数据如下: image4-2.png

          示例中创建topic:a15fdd9dd5154845b32f7c74ae155ae3__demo_test 并且确保该topic下有对应的证书,将证书下载到本地。

          编辑BSC 作业

          创建Kafka Source

          进入BSC编辑作业界面,创建kafka source table, sql代码如下

          CREATE table source_table_kafka(
          stringtype STRING,
          longtype LONG
          ) with(
              type = 'BKAFKA',
              topic = 'a15fdd9dd5154845b32f7c74ae155ae3__demo_test',
              kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
              sslFilePath = 'kafka_key.zip',
              encode = 'json'
          );

          其中sslFilePath = 'kafka-key.zip',为上一步下载到本地的kafka证书。

          上传Kafka证书

          点击高级设置,上传kafka证书

          image4-3.png

          上传之后如下图

          image4-4.png

          创建Es Sink Table

          sql代码如下

          create table sink_table_es(
              stringtype String,
              longtype Long
          )with(
              type = 'ES',
              es.net.http.auth.user = 'superuser',
              es.net.http.auth.pass = 'bbs_2016',
              es.resource = 'bsc_test/doc_type',
              es.clusterId = '296245916518715392',
              es.region = 'bd',
              es.port = '8200',
              es.version = '6.5.3'
          );

          其中:

          • es.resource对应es的索引与类型,es会在bsc写入数据时自动创建指定索引
          • es.clusterId对应es的集群ID
          • es.region 表示 Es服务所在的地区的代码,可以参考 Es服务区域代码 中查询区域与代码的对应关系。

          编写导入语句

          sql语句如下:

          insert into
              sink_table_es(stringtype, longtype) outputmode append
          select
              stringtype,
              longtype
          from
              source_table_kafka;
              

          保存作业并发布运行作业

          查看Es中的数据

          image4-5.png

          上一篇
          使用BSC将BOS中的数据导入Es
          下一篇
          Kibana