Flink使用指南
更新时间:2024-01-19
Flink
flink-bos-hadoop是百度云对象存储系统BOS针对 Flink 的文件系统实现,并且支持了 recoverwriter 接口,Flink 可以基于该文件系统实现读写 BOS 上的数据以及作为流应用的状态后端。
安装
1.Flink环境准备
以1.15.0版本为例。
# 下载到一个路径
wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
#解压缩
tar zxvf flink-1.15.0-bin-scala_2.12.tgz
2.添加依赖jar包和配置
在启动 Flink 之前,下载flink-bos-hadoop插件。
# flink-bos-hadoop插件导入flink下
mkdir ./plugins/bos-fs-hadoop
cp flink-bos-hadoop-1.15.0-0.1.0.jar ./plugins/bos-fs-hadoop/
# 访问BOS的一些必要配置
vim ./conf/flink-conf.yaml
...
cat ./conf/flink-conf.yaml
fs.bos.impl: org.apache.hadoop.fs.bos.BaiduBosFileSystem
fs.AbstractFileSystem.bos.impl: org.apache.hadoop.fs.bos.BOS
fs.bos.access.key: {your ak}
fs.bos.secret.access.key: {your sk}
fs.bos.endpoint: bj.bcebos.com {your bucket endpoint}
使用
启动
./bin/start-cluster.sh
提交作业
# 通过以下格式指定路径,BOS 对象可类似于普通文件使用:bos://<your-bucket>/{object-name}
./bin/flink run examples/streaming/WordCount.jar --input "bos://my_bucket/students.txt" --output "bos://my_bucket/out"
查看运行结果
# 查看wordcount统计结果
$ hadoop fs -ls bos://my_bucket/out
Found 1 items
drwxrwxrwx - 0 1970-01-01 08:00 bos://my_bucket/out/2023-08-10--15
$ hadoop fs -ls bos://my_bucket/out/2023-08-10--15/
Found 1 items
-rw-rw-rw- 1 1792 2023-08-10 15:52 bos://my_bucket/out/2023-08-10--15/part-3053774f-2d8e-40c5-aa3c-01402ce4b6b4-0
$ hadoop fs -cat bos://my_bucket/out/2023-08-10--15/part-3053774f-2d8e-40c5-aa3c-01402ce4b6b4-0
(name,1)
(studentname,1)
(age,1)
...