所有文档

          百度MapReduce BMR

          Flink

          Flink简介

          Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

          创建集群

          登录百度智能云控制台,选择“产品服务->百度MapReduce BMR”,点击“创建集群”,进入集群创建页。BMR2.0.0及以上版本已支持 Flink 组件集成,购置集群时勾选 Flink 组件即可, 如下图所示:

          image.png

          使用简介

          1. 远程登录到创建好的集群中

            ssh hdfs@$public_ip
            使用创建集群时输入的密码

          2. 运行WordCount作业 (未开启Kerberos认证)
          • 先上传一份文件到HDFS中

            hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp

          • 执行如下命令,在Yarn上提交作业作业:

            flink run --jobmanager yarn-cluster \
            -yn 1 \
            -ytm 1024 \
            -yjm 1024 \
            /opt/bmr/flink/examples/batch/WordCount.jar \
            --input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \
            --output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
          • 作业成功提交后,最终的运行结果如下:

            image.png

          实时流计算(Scala)

          数据计算过程通过BMR的Flink消费百度消息服务BMS。本文以使用Scala为例,Flink版本1.8.2,线上Kafka版本2.1。具体步骤如下:

          第一步 创建Topic并下载百度消息服务的证书

          (本步骤详情请参考文档 Spark流式应用场景

          下载证书:

          image.png

          第二步 编写业务代码

          package com.baidu.inf.flink
           
          import java.util.Properties
           
          import org.apache.flink.api.common.serialization.SimpleStringSchema
          import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
          import org.apache.kafka.common.serialization.StringDeserializer
          import org.slf4j.LoggerFactory
           
          object CloudFlinkConsumeKafkaDemo {
            private val logger = LoggerFactory.getLogger(this.getClass)
           
            def main(args: Array[String]): Unit = {
              logger.info("************ Flink Consume Kafka Demo start **************")
              if (args.length < 7) {
                logger.error(" Parameters Are Missing , " +
                  "Needs : <topic> " +
                  "<groupId> " +
                  "<brokerHosts> " +
                  "<truststore_location> " +
                  "<truststore_pass> " +
                  "<keystore_location> " +
                  "<keystore_pass>")
                System.exit(-1)
              }
              val Array(topic, groupId, brokerHosts,
              truststore_location, truststore_pass,
              keystore_location, keystore_pass, _*) = args
           
              val env = StreamExecutionEnvironment.getExecutionEnvironment
              env.setParallelism(2)
              env.getConfig.disableSysoutLogging
           
              val kafkaProperties = new Properties()
              kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
              kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
              kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
              kafkaProperties.setProperty("group.id", groupId)
              kafkaProperties.setProperty("auto.offset.reset", "latest")
              kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
              kafkaProperties.setProperty("security.protocol", "SSL")
              kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
              kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
              kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
              kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
              kafkaProperties.setProperty("enable.auto.commit", "true")
           
              val ds = env.addSource(
                new FlinkKafkaConsumer[String](topic,
                  new SimpleStringSchema(),
                  kafkaProperties))
           
              ds.print()
              env.execute()
            }
          }

          第三步 编译代码,打成可执行Jar文件,上传到服务器上

          (注:要保证第一步下载的证书文件在集群每个节点上相同的路径下都存在)

          运行作业示例:
          flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"

          第四步 消息队列中生产一些消息,在Flink作业监控页面上查看对应输出

          通过Tunnel登录到集群的Yarn页面上(通过SSH-Tunnel访问集群

          在yarn console找到对应作业的application的单击application名称,进入作业详情页面:
          (在Flink的原生页面上,点击TaskManagers > Stdout,查看作业运行情况)

          image.png

          参考

          1. Flink应用场景
          2. Release Notes - Flink 1.8
          上一篇
          Zeppelin
          下一篇
          Druid