Flink
Flink简介
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。
创建集群
登录百度智能云控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页。BMR2.1.1及以上版本已支持 Flink 组件集成,购置集群时勾选 Flink 组件即可, 如下图所示: 注意: 不同BMR版本对应支持的Flink组件版本也不同,具体支持版本以选择BMR版本后可选服务中组件版本为准。
使用简介
-
远程登录到创建好的集群中
ssh hdfs@$public_ip
使用创建集群时输入的密码 - 运行WordCount作业 (未开启Kerberos认证)
-
先上传一份文件到HDFS中
hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp
-
执行如下命令,在Yarn上提交作业作业:
flink run --jobmanager yarn-cluster \ -yn 1 \ -ytm 1024 \ -yjm 1024 \ /opt/bmr/flink/examples/batch/WordCount.jar \ --input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \ --output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
-
作业成功提交后,最终的运行结果如下:
实时流计算(Scala)
数据计算过程通过BMR的Flink消费百度消息服务BMS。本文以使用Scala为例,Flink版本1.8.2,线上Kafka版本2.1。具体步骤如下:
第一步 创建Topic并下载百度消息服务的证书
(本步骤详情请参考文档 Spark流式应用场景)
下载证书:
第二步 编写业务代码
package com.baidu.inf.flink
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
object CloudFlinkConsumeKafkaDemo {
private val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
logger.info("************ Flink Consume Kafka Demo start **************")
if (args.length < 7) {
logger.error(" Parameters Are Missing , " +
"Needs : <topic> " +
"<groupId> " +
"<brokerHosts> " +
"<truststore_location> " +
"<truststore_pass> " +
"<keystore_location> " +
"<keystore_pass>")
System.exit(-1)
}
val Array(topic, groupId, brokerHosts,
truststore_location, truststore_pass,
keystore_location, keystore_pass, _*) = args
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env.getConfig.disableSysoutLogging
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
kafkaProperties.setProperty("group.id", groupId)
kafkaProperties.setProperty("auto.offset.reset", "latest")
kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
kafkaProperties.setProperty("security.protocol", "SSL")
kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
kafkaProperties.setProperty("enable.auto.commit", "true")
val ds = env.addSource(
new FlinkKafkaConsumer[String](topic,
new SimpleStringSchema(),
kafkaProperties))
ds.print()
env.execute()
}
}
第三步 编译代码,打成可执行Jar文件,上传到服务器上
(注:要保证第一步下载的证书文件在集群每个节点上相同的路径下都存在)
运行作业示例:
flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"
第四步 消息队列中生产一些消息,在Flink作业监控页面上查看对应输出
通过Tunnel登录到集群的Yarn页面上(通过SSH-Tunnel访问集群)
在yarn console找到对应作业的application的单击application名称,进入作业详情页面:
(在Flink的原生页面上,点击TaskManagers > Stdout,查看作业运行情况)