简介:本文详细介绍Flink单机部署的完整流程,涵盖环境准备、配置文件修改、启动命令及常见问题处理,适合开发者快速搭建本地开发环境。
Flink单机部署对硬件要求较低,但需满足最低配置标准:
关键点:内存不足会导致JobManager或TaskManager崩溃,磁盘空间不足可能引发检查点失败。建议通过free -h(Linux)或任务管理器(Windows)检查资源。
Flink依赖Java 8或11(推荐11):
# 检查Java版本java -version# 若未安装,Ubuntu示例:sudo apt update && sudo apt install openjdk-11-jdk
验证:echo $JAVA_HOME应返回JDK路径(如/usr/lib/jvm/java-11-openjdk-amd64)。
若使用Scala API,需安装对应版本(与Flink版本匹配,如Flink 1.17支持Scala 2.12):
# Ubuntu安装Scala 2.12sudo apt install scala# 验证scala -version
从Apache Flink官网选择版本(推荐稳定版,如1.17.0):
wget https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgztar -xzf flink-1.17.0-bin-scala_2.12.tgzcd flink-1.17.0
注意:解压目录需有读写权限,避免后续启动失败。
位于conf/目录,关键参数如下:
# JobManager内存(默认1024MB,复杂作业需增加)jobmanager.memory.process.size: 2048mb# TaskManager内存(默认1024MB,按作业需求调整)taskmanager.memory.process.size: 2048mb# TaskManager槽位数(默认1,根据CPU核心数设置)taskmanager.numberOfTaskSlots: 4# Web UI端口(默认8081,冲突时修改)rest.port: 8081
优化建议:
总内存 = JobManager内存 + TaskManager内存 * 节点数 槽位数 = CPU核心数 * 1.5(避免过度分配)localhost) localhost) 文件内容示例:
# conf/masterslocalhost:8081# conf/workerslocalhost
./bin/start-cluster.sh
现象:终端会持续输出日志,按Ctrl+C终止。
./bin/start-cluster.sh --daemon
终止命令:
./bin/stop-cluster.sh
日志位于log/目录:
flink-*-jobmanager-*.log:JobManager日志 flink-*-taskmanager-*.log:TaskManager日志 常见错误:
OutOfMemoryError:内存配置不足 Port already in use:端口冲突(修改rest.port) ClassNotFoundException:依赖缺失(检查lib/目录)浏览器打开http://localhost:8081,应显示Flink Dashboard:
截图示例:
(实际需自行截图,展示任务提交、运行状态等)
WordCount.java):public class WordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
“Hello Flink”, “Hello World”, “Flink is awesome”
);
DataStream
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
counts.print();
env.execute(“WordCount Example”);
}
public static final class Tokenizer implements FlatMapFunction
@Override
public void flatMap(String value, Collector
String[] words = value.toLowerCase().split(“\W+”);
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
2. 编译打包(需Maven或Gradle):```bashmvn clean package
./bin/flink run -c com.example.WordCount /path/to/wordcount-1.0.jar
创建words.sql文件:
CREATE TABLE source (word STRING) WITH ('connector' = 'datagen','fields.word.length' = '5');CREATE TABLE sink (word STRING, cnt BIGINT) WITH ('connector' = 'print');INSERT INTO sinkSELECT word, COUNT(*) as cntFROM sourceGROUP BY word;
提交SQL作业:
./bin/sql-client.sh embed -f words.sql
print()输出的结果(位于log/目录) 通过Web UI的Metrics标签页查看:
Could not start the TaskManager原因:内存不足或端口冲突
解决:
taskmanager.memory.process.size taskmanager.rpc.port(默认6123) ClassNotFoundException原因:依赖缺失
解决:
lib/目录 --classpath参数指定依赖路径 DataStream API not found原因:Flink版本与API不兼容
解决:检查pom.xml中的Flink版本是否与安装版本一致
Checkpoint failed原因:磁盘空间不足或权限问题
解决:
state.backend.fs.checkpointdir) taskmanager.memory.off-heap.size(处理大对象时) taskmanager.memory.managed.fraction(默认0.4) parallelism.default配置(默认1) setParallelism(4) 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 全局并发度DataStream<String> text = env.readTextFile("input.txt").setParallelism(2); // 算子级并发度
flink-conf.yaml与二进制包解耦 lib/目录或Maven Shade插件打包 扩展阅读:
通过本文,开发者可完成Flink单机环境的完整部署,并掌握作业提交、监控及调试的核心技能,为后续集群部署或生产环境优化奠定基础。