Flink自定义JAR作业
更新时间:2024-01-04
背景
BSC 产品支持用户提交FLINK自定义jar作业,以读KAFKA写BOS为例,其具体步骤如下:
步骤
1. 开发作业
开发环境
使用IDEA进行开发,项目管理使用maven,相关版本为
名称 | 版本 |
---|---|
java | 1.8 |
scala | 2.11 |
flink | 1.11.2 |
项目结构
整体项目结构如下图所示。由于示例中使用了KAFKA,因此将对应kafka配置文件放在resource下一起打包到jar中。
pom文件
pom文件如下所示,相关的注意事项见文件注释
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 指定相关依赖的版本号 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scope.setting>provided</scope.setting>
</properties>
<dependencies>
<!-- 1、 bsc运行环境中包含 flink 核心依赖,所以下面涉及到的 flink 核心依赖无需打到项目jar中,
在打包的时候需要指定scope为provided
-->
<!-- flink 核心依赖:flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.setting}</scope>
</dependency>
<!-- 2、 bsc运行环境中包含 flink 常用依赖,所以下面涉及到的 flink 其他依赖无需打到项目jar中,
在打包的时候需要指定scope为provided
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>${scope.setting}</scope>
</dependency>
<!-- 3、 bsc运行环境中不包含 flink connector依赖,所以下面涉及到的 flink connector 依赖需要打到项目jar中。
注意:
- bos相关的依赖无须额外引用,
- kafka必须使用0.10版本的connector和client
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>~~~~
</dependency>
<!-- 4、 下面可以引用 flink 之外的一些依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- scala编译插件 -->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
demo代码
读kafka写bos的代码如下:
package com.baidu.bce.bsc.demo.flink;
import lombok.Getter;
import lombok.Setter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* 使用Flink DataStream接口完成读kafka,写bos的jar作业示例demo
* 示例中:
* - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件下载、解析能力,读取配置文件和bos上的证书zip包
* - 从kafka读取的数据为json格式,需要用户自行指定schema
* - 写bos进行了按照时间分桶的操作,以及生成commit文件
*/
public class Kafka2Bos {
// 启动日志处理
public static Logger logger = org.slf4j.LoggerFactory.getLogger(Kafka2Bos.class);
// 用于解析json
public static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
// 1. 获取参数
/**
* args(0) 为BSC为用户指定的checkpoint目录,无法更改。
* 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。
*/
String checkpointLocation = args[0];
/**
* args[1] 为BSC提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用
* 参数格式:
* key1=value1
* key2=value2
* 本代码中示例:
* bootStrapServer=kafka.bj.baidubce.com:9092
* topic=test
* bosEndpoint=https://bj.bcebos.com
* ...
*/
Map<String, String> variables = new LinkedHashMap<>();
try {
String kvStr = new String(Base64.getDecoder().decode(args[1]));
Arrays.stream(kvStr.split("\n")).forEach(kv -> {
String[] variable = kv.split("=");
variables.put(variable[0], variable[1]);
});
} catch (Exception e){
logger.error("decode job variables failed", e);
throw new Exception(e);
}
/** 为配置参数赋值 */
String sourceBootstrapServer = variables.get("bootStrapServer");
String sourceTopic = variables.get("topic");
String sourceGroupId = variables.get("groupId");
String certBosUrl = variables.get("certBosUrl");
String bosSink = variables.get("bosSink");
String bosDatatimePattern = variables.get("bosDatatimePattern");
// 2. create StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 配置一些运行参数,如检查点参数 */
env.setStateBackend(new FsStateBackend(checkpointLocation));
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointInterval(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 3. 创建 kafka streaming Source
// 设置 kafka consumer 参数,并读取 client.properties 中的配置
Properties sourceProp = new Properties();
sourceProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceBootstrapServer);
sourceProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, sourceGroupId);
ClassLoader classLoader = Kafka2Bos.class.getClassLoader();
sourceProp.load(classLoader.getResourceAsStream("client.properties"));
/**
* 提取kafka value中的数据字段, 配置用户自定义schema
* 使用RowTypeInfo的形式来配置所有的类型
*
* 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。
* 以json格式为例
* {
* "stringtype": "lRAhSQgShKn77uD",
* "longtype": 1199158871,
* "floattype": 0.038981155578358462,
* "binarytype": "null",
* "integertype": 1,
* "bytetype": -58,
* "booleantype": true,
* "doubletype": 439147658,
* "shorttype": 13538
* }
*/
List<DataType> dataTypes = Arrays.asList(
DataTypes.STRING(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.BYTES(),
DataTypes.INT(),
DataTypes.TINYINT(),
DataTypes.BOOLEAN(),
DataTypes.DOUBLE(),
DataTypes.SMALLINT());
TypeInformation[] types = dataTypes.stream()
.map(type -> TypeConversions.fromDataTypeToLegacyInfo(type))
.collect(Collectors.toList())
.toArray(new TypeInformation[dataTypes.size()]);
String[] filedNames = Arrays.asList(
"stringtype",
"longtype",
"floattype",
"binarytype",
"integertype",
"bytetype",
"booleantype",
"doubletype",
"shorttype")
.toArray(new String[dataTypes.size()]);
RowTypeInfo typeInfo = new RowTypeInfo(types, filedNames);
JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema.Builder(typeInfo).build();
/** 创建 FlinkKafkaConsumer,使用0.10版本 */
FlinkKafkaConsumer010<Row> flinkKafkaConsumer = new FlinkKafkaConsumer010<Row>(sourceTopic, schema, sourceProp) {
@Override
public void open(Configuration configuration) throws Exception {
this.properties.putAll(copySslFileAndGetLocalProperties(sourceProp, certBosUrl));
super.open(configuration);
}
};
flinkKafkaConsumer.setStartFromEarliest();
DataStream<Row> source = env.addSource(flinkKafkaConsumer);
// 4. 进行一些数据处理pipeline
// 此处举例为只提取需要的字段, 并转换json为字符串
/**
* 输出数据全部字段,但有输出转储的只有如下三个,分别为:
* {
* "stringtype": "lRAhSQgShKn77uD-1",
* "longtype": 1199158871,
* "floattype": 0.038981155578358462
* }
*/
DataStream<String> operatorDataStream = source.map(new MapFunction<Row, KafkaValue>() {
@Override
public KafkaValue map(Row row) throws Exception {
KafkaValue kafkaValue = new KafkaValue();
kafkaValue.setStringtype(row.getField(0).toString() + "-" + row.getField(4).toString());
kafkaValue.setLongtype((long)row.getField(1));
kafkaValue.setFloattype((float)row.getField(2));
return kafkaValue;
}
}).map(new MapFunction<KafkaValue, String>() {
@Override
public String map(KafkaValue kafkaValue) throws Exception {
return objectMapper.writeValueAsString(kafkaValue);
}
});
// 5. 创建 bos streaming Sink
// 这里使用了按照时间分桶的策略,并设置了滚动策略,配置了一些参数
StreamingFileSink.RowFormatBuilder bosStreamingFileSinkBuilder = StreamingFileSink
.forRowFormat(new Path(bosSink), new SimpleStringEncoder<>())
.withBucketAssigner(new DateTimeBucketAssigner(bosDatatimePattern))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(1))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(128L * 1024L * 1024L)
.build())
.withBucketCheckInterval(10000);
StreamingFileWriter fileWriter = new StreamingFileWriter(
bosStreamingFileSinkBuilder.getBucketCheckInterval(), bosStreamingFileSinkBuilder);
// 设置了commit算子,用于标志该时间段数据已经写完
DataStream<StreamingFileCommitter.CommitMessage> writerStream = operatorDataStream
.transform(StreamingFileWriter.class.getSimpleName(),
TypeExtractor.createTypeInfo(CommitMessage.class),
(OneInputStreamOperator) fileWriter)
.setParallelism(1);
writerStream.addSink(new DiscardingSink()).setParallelism(1);
// 6. 启动执行
env.execute("flink-kafka-to-bos-jar-demo");
}
/**
* kafka value 对应的对象格式
*/
@Getter
@Setter
public static class KafkaValue {
private String stringtype;
private Long longtype;
private Float floattype;
private String binarytype;
private Integer integertype;
private Byte bytetype;
private Boolean booleantype;
private Double doubletype;
private Short shorttype;
}
/**
* 从bos下载ssl证书压缩包,解压后加入到配置中
* 注意:需要先在bos上创建好对应的目录,并设置权限为公共读,并且传入的url应该为普通访问链接,而非CDN加速链接
* @param properties
* @return
*/
private static Properties copySslFileAndGetLocalProperties(Properties properties, String certBosUrl) {
String userDir = System.getProperty("user.dir");
/** 尝试3次,如果失败则抛出异常 */
int i = 0;
for (; i < 3; i++) {
try {
FileUtil.downloadBosFileAndUnzip(userDir, certBosUrl);
break;
} catch (IOException e) {
logger.error("download bos file fail when try: {}", i, e);
}
}
if (i >= 3) {
throw new RuntimeException("download bos file fail");
}
properties.setProperty("ssl.truststore.location", userDir + "/" + properties.getProperty("ssl.truststore.location"));
properties.setProperty("ssl.keystore.location", userDir + "/" + properties.getProperty("ssl.keystore.location"));
logger.info("ssl.truststore.location: " + properties.getProperty("ssl.truststore.location"));
logger.info("ssl.keystore.location: " + properties.getProperty("ssl.keystore.location"));
return properties;
}
/**
* bos文件下载工具类
*/
public static class FileUtil {
private static final String PATH_SEPARATOR = "/";
/**
* 通过url下载bos上的xx.tar.gz(设置为公共读权限),
* 并解压至指定路径生成一个文件目录
* @param outputDir
* @param url
*/
public static void downloadBosFileAndUnzip(String outputDir, String url) throws IOException {
URLConnection connection = new URL(url).openConnection();
ZipInputStream zipIn = null;
try{
zipIn = new ZipInputStream(
new BufferedInputStream(connection.getInputStream()));
ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null ) {
if (entry.isDirectory()) {
createDirectory(outputDir, entry.getName());
} else {
File tmpFile = new File(outputDir + PATH_SEPARATOR + entry.getName());
OutputStream out = null;
try{
out = new FileOutputStream(tmpFile);
int length = 0;
byte[] b = new byte[2048];
while ((length = zipIn.read(b)) != -1) {
out.write(b, 0, length);
}
} catch (IOException ex){
logger.error("write to {} fail", tmpFile, ex);
} finally {
if (out != null) {
out.close();
}
}
}
}
} catch (IOException ex){
throw new IOException("解压归档文件出现异常",ex);
} finally {
try {
if (zipIn != null) {
zipIn.close();
}
} catch (IOException ex){
throw new IOException("关闭tarFile出现异常",ex);
}
}
}
/**
* 构建证书目录
* @param outputDir
* @param subDir
*/
public static void createDirectory(String outputDir, String subDir){
File file = new File(outputDir + PATH_SEPARATOR + subDir);
if (!file.exists()) {
file.mkdir();
}
}
}
}
项目打包
执行 mvn clean package
命令之后,能编译出用于bsc运行的jar包 flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
2. 新增资源
进入BSC控制台,选择资源管理,点击新增资源按钮。资源类型需要选择为JOB_FILE/JAR,上传方式与jar包大小相关,可以选择bos上传或者本地上传。
上传完成之后效果如下:
3. 新增作业
打开BSC控制台,点击新建作业按钮,新建一个FLINK_STREAM/JAR作业如下图示例。
编辑作业参数
在作业开发的富文本编辑框中,配置SPARK jar的参数信息如示例:
-- 函数完整类名
main.class=com.baidu.bce.bsc.demo.flink.Kafka2Bos;
-- 完整主类名JAR包的资源名称
main.jar=flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
-- 函数参数设置,必须以“main.args.”开头
main.args.bootStrapServer=kafka.gz.baidubce.com:9092;
main.args.topic=topictopictopic;
main.args.groupId=groupgroupgroup;
main.args.certBosUrl=https://bucket.bj.bcebos.com/kafka-key.zip;
main.args.bosSink=bos://bucket/object;
main.args.bosDatatimePattern=yyyy-MM-dd--HH;
引用jar资源
在资源引用栏中选择刚才上传的jar包,点击引用;并将资源详情中的资源原名作为作业参数main.jar的参数
保存发布作业
依次点击保存、发布按钮,将作业发布到作业运维列表。
4. 运行作业
切换到作业运维的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择从上次作业停止时间点启动
启动之后,可以看到作业运行日志,但jar作业不支持实时监控的查看。
5. 更新作业
如果需要更新作业jar包,需要按照如下步骤执行:
- 停止运行中的作业。
- 在资源管理列表对相应的jar包发起"新增版本"操作。
- 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
- 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
- 重启启动作业。
如果仅仅是需要修改作业参数,可以简化步骤为:
- 停止运行中的作业。
- 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
- 重启启动作业。