Flink自定义JAR作业
所有文档
menu

百度流式计算 BSC

Flink自定义JAR作业

产品详情

背景

BSC 产品支持用户提交FLINK自定义jar作业,以读KAFKA写BOS为例,其具体步骤如下:

步骤

1. 开发作业

开发环境

使用IDEA进行开发,项目管理使用maven,相关版本为

名称 版本
java 1.8
scala 2.11
flink 1.11.2

项目结构

整体项目结构如下图所示。由于示例中使用了KAFKA,因此将对应kafka配置文件放在resource下一起打包到jar中。

image.png

pom文件

pom文件如下所示,相关的注意事项见文件注释

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--  指定相关依赖的版本号  -->
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.11.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scope.setting>provided</scope.setting>
    </properties>

    <dependencies>
        <!--  1、 bsc运行环境中包含 flink 核心依赖,所以下面涉及到的 flink 核心依赖无需打到项目jar中,
              在打包的时候需要指定scope为provided
        -->
        <!--  flink 核心依赖:flink-table-api-java  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>


        <!--  2、 bsc运行环境中包含 flink 常用依赖,所以下面涉及到的 flink 其他依赖无需打到项目jar中,
              在打包的时候需要指定scope为provided
        -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>


        <!--  3、 bsc运行环境中不包含 flink connector依赖,所以下面涉及到的 flink connector 依赖需要打到项目jar中。
                 注意:
                   - bos相关的依赖无须额外引用,
                   - kafka必须使用0.10版本的connector和client
        -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>~~~~
        </dependency>

        <!--  4、 下面可以引用 flink 之外的一些依赖  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!--  scala编译插件  -->
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--  按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

demo代码

读kafka写bos的代码如下:

package com.baidu.bce.bsc.demo.flink;

import lombok.Getter;
import lombok.Setter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

/**
 * 使用Flink DataStream接口完成读kafka,写bos的jar作业示例demo
 * 示例中:
 *   - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件下载、解析能力,读取配置文件和bos上的证书zip包
 *   - 从kafka读取的数据为json格式,需要用户自行指定schema
 *   - 写bos进行了按照时间分桶的操作,以及生成commit文件
 */
public class Kafka2Bos {

    // 启动日志处理
    public static Logger logger = org.slf4j.LoggerFactory.getLogger(Kafka2Bos.class);

    // 用于解析json
    public static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String[] args) throws Exception {

        // 1. 获取参数
        /**
         * args(0) 为BSC为用户指定的checkpoint目录,无法更改。
         * 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。
         */
        String checkpointLocation = args[0];
        /**
         * args[1] 为BSC提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用
         * 参数格式:
         *      key1=value1
         *      key2=value2
         * 本代码中示例:
         *     bootStrapServer=kafka.bj.baidubce.com:9092
         *     topic=test
         *     bosEndpoint=https://bj.bcebos.com
         *     ...
         */
        Map<String, String> variables = new LinkedHashMap<>();
        try {
            String kvStr = new String(Base64.getDecoder().decode(args[1]));
            Arrays.stream(kvStr.split("\n")).forEach(kv -> {
                String[] variable = kv.split("=");
                variables.put(variable[0], variable[1]);
            });
        } catch (Exception e){
            logger.error("decode job variables failed", e);
            throw new Exception(e);
        }
        /** 为配置参数赋值 */
        String sourceBootstrapServer = variables.get("bootStrapServer");
        String sourceTopic = variables.get("topic");
        String sourceGroupId = variables.get("groupId");
        String certBosUrl = variables.get("certBosUrl");
        String bosSink = variables.get("bosSink");
        String bosDatatimePattern = variables.get("bosDatatimePattern");

        // 2. create StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /** 配置一些运行参数,如检查点参数 */
        env.setStateBackend(new FsStateBackend(checkpointLocation));
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointInterval(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 3. 创建 kafka streaming Source
        // 设置 kafka consumer 参数,并读取 client.properties 中的配置
        Properties sourceProp = new Properties();
        sourceProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceBootstrapServer);
        sourceProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, sourceGroupId);
        ClassLoader classLoader = Kafka2Bos.class.getClassLoader();
        sourceProp.load(classLoader.getResourceAsStream("client.properties"));
        /**
         * 提取kafka value中的数据字段, 配置用户自定义schema
         * 使用RowTypeInfo的形式来配置所有的类型
         *
         * 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。
         * 以json格式为例
         * {
         *    "stringtype": "lRAhSQgShKn77uD",
         *    "longtype": 1199158871,
         *    "floattype": 0.038981155578358462,
         *    "binarytype": "null",
         *    "integertype": 1,
         *    "bytetype": -58,
         *    "booleantype": true,
         *    "doubletype": 439147658,
         *    "shorttype": 13538
         *  }
         */
        List<DataType> dataTypes = Arrays.asList(
                DataTypes.STRING(),
                DataTypes.BIGINT(),
                DataTypes.FLOAT(),
                DataTypes.BYTES(),
                DataTypes.INT(),
                DataTypes.TINYINT(),
                DataTypes.BOOLEAN(),
                DataTypes.DOUBLE(),
                DataTypes.SMALLINT());
        TypeInformation[] types = dataTypes.stream()
                .map(type -> TypeConversions.fromDataTypeToLegacyInfo(type))
                .collect(Collectors.toList())
                .toArray(new TypeInformation[dataTypes.size()]);
        String[] filedNames = Arrays.asList(
                    "stringtype",
                    "longtype",
                    "floattype",
                    "binarytype",
                    "integertype",
                    "bytetype",
                    "booleantype",
                    "doubletype",
                    "shorttype")
                .toArray(new String[dataTypes.size()]);
        RowTypeInfo typeInfo = new RowTypeInfo(types, filedNames);
        JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema.Builder(typeInfo).build();

        /** 创建 FlinkKafkaConsumer,使用0.10版本 */
        FlinkKafkaConsumer010<Row> flinkKafkaConsumer = new FlinkKafkaConsumer010<Row>(sourceTopic, schema, sourceProp) {
            @Override
            public void open(Configuration configuration) throws Exception {
                this.properties.putAll(copySslFileAndGetLocalProperties(sourceProp, certBosUrl));
                super.open(configuration);
            }
        };
        flinkKafkaConsumer.setStartFromEarliest();
        DataStream<Row> source = env.addSource(flinkKafkaConsumer);

        // 4. 进行一些数据处理pipeline
        // 此处举例为只提取需要的字段, 并转换json为字符串
        /**
         * 输出数据全部字段,但有输出转储的只有如下三个,分别为:
         * {
         *    "stringtype": "lRAhSQgShKn77uD-1",
         *    "longtype": 1199158871,
         *    "floattype": 0.038981155578358462
         * }
         */
        DataStream<String> operatorDataStream = source.map(new MapFunction<Row, KafkaValue>() {
            @Override
            public KafkaValue map(Row row) throws Exception {
                KafkaValue kafkaValue = new KafkaValue();
                kafkaValue.setStringtype(row.getField(0).toString() + "-" + row.getField(4).toString());
                kafkaValue.setLongtype((long)row.getField(1));
                kafkaValue.setFloattype((float)row.getField(2));
                return kafkaValue;
            }
        }).map(new MapFunction<KafkaValue, String>() {
            @Override
            public String map(KafkaValue kafkaValue) throws Exception {
                return objectMapper.writeValueAsString(kafkaValue);
            }
        });

        // 5. 创建 bos streaming Sink
        // 这里使用了按照时间分桶的策略,并设置了滚动策略,配置了一些参数
        StreamingFileSink.RowFormatBuilder bosStreamingFileSinkBuilder = StreamingFileSink
                .forRowFormat(new Path(bosSink), new SimpleStringEncoder<>())
                .withBucketAssigner(new DateTimeBucketAssigner(bosDatatimePattern))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(1))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
                        .withMaxPartSize(128L * 1024L * 1024L)
                        .build())
                .withBucketCheckInterval(10000);
        StreamingFileWriter fileWriter = new StreamingFileWriter(
                bosStreamingFileSinkBuilder.getBucketCheckInterval(), bosStreamingFileSinkBuilder);
        // 设置了commit算子,用于标志该时间段数据已经写完
        DataStream<StreamingFileCommitter.CommitMessage> writerStream = operatorDataStream
                .transform(StreamingFileWriter.class.getSimpleName(),
                        TypeExtractor.createTypeInfo(CommitMessage.class),
                        (OneInputStreamOperator) fileWriter)
                .setParallelism(1);

        writerStream.addSink(new DiscardingSink()).setParallelism(1);

        // 6. 启动执行
        env.execute("flink-kafka-to-bos-jar-demo");
    }


    /**
     * kafka value 对应的对象格式
     */
    @Getter
    @Setter
    public static class KafkaValue {
        private String stringtype;
        private Long longtype;
        private Float floattype;
        private String binarytype;
        private Integer integertype;
        private Byte bytetype;
        private Boolean booleantype;
        private Double doubletype;
        private Short shorttype;
    }

    /**
     * 从bos下载ssl证书压缩包,解压后加入到配置中
     * 注意:需要先在bos上创建好对应的目录,并设置权限为公共读,并且传入的url应该为普通访问链接,而非CDN加速链接
     * @param properties
     * @return
     */
    private static Properties copySslFileAndGetLocalProperties(Properties properties, String certBosUrl) {
        String userDir = System.getProperty("user.dir");
        /** 尝试3次,如果失败则抛出异常 */
        int i = 0;
        for (; i < 3; i++) {
            try {
                FileUtil.downloadBosFileAndUnzip(userDir, certBosUrl);
                break;
            } catch (IOException e) {
                logger.error("download bos file fail when try: {}", i, e);
            }
        }
        if (i >= 3) {
            throw new RuntimeException("download bos file fail");
        }
        properties.setProperty("ssl.truststore.location", userDir + "/" + properties.getProperty("ssl.truststore.location"));
        properties.setProperty("ssl.keystore.location", userDir + "/" + properties.getProperty("ssl.keystore.location"));
        logger.info("ssl.truststore.location: " + properties.getProperty("ssl.truststore.location"));
        logger.info("ssl.keystore.location: " + properties.getProperty("ssl.keystore.location"));
        return properties;
    }

    /**
     * bos文件下载工具类
     */
    public static class FileUtil {

        private static final String PATH_SEPARATOR = "/";

        /**
         * 通过url下载bos上的xx.tar.gz(设置为公共读权限),
         * 并解压至指定路径生成一个文件目录
         * @param outputDir
         * @param url
         */
        public static void downloadBosFileAndUnzip(String outputDir, String url) throws IOException {
            URLConnection connection = new URL(url).openConnection();
            ZipInputStream zipIn = null;
            try{
                zipIn = new ZipInputStream(
                        new BufferedInputStream(connection.getInputStream()));
                ZipEntry entry;
                while ((entry = zipIn.getNextEntry()) != null ) {
                    if (entry.isDirectory()) {
                        createDirectory(outputDir, entry.getName());
                    } else {
                        File tmpFile = new File(outputDir + PATH_SEPARATOR + entry.getName());
                        OutputStream out = null;
                        try{
                            out = new FileOutputStream(tmpFile);
                            int length = 0;
                            byte[] b = new byte[2048];
                            while ((length = zipIn.read(b)) != -1) {
                                out.write(b, 0, length);
                            }
                        } catch (IOException ex){
                            logger.error("write to {} fail", tmpFile,  ex);
                        } finally {
                            if (out != null) {
                                out.close();
                            }
                        }
                    }
                }
            } catch (IOException ex){
                throw new IOException("解压归档文件出现异常",ex);
            } finally {
                try {
                    if (zipIn != null) {
                        zipIn.close();
                    }
                } catch (IOException ex){
                    throw new IOException("关闭tarFile出现异常",ex);
                }
            }
        }

        /**
         * 构建证书目录
         * @param outputDir
         * @param subDir
         */
        public static void createDirectory(String outputDir, String subDir){
            File file = new File(outputDir + PATH_SEPARATOR + subDir);
            if (!file.exists()) {
                file.mkdir();
            }
        }

    }
}

项目打包

执行 mvn clean package 命令之后,能编译出用于bsc运行的jar包 flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

image.png

2. 新增资源

进入BSC控制台,选择资源管理,点击新增资源按钮。资源类型需要选择为JOB_FILE/JAR,上传方式与jar包大小相关,可以选择bos上传或者本地上传。

image.png

上传完成之后效果如下:

image.png

3. 新增作业

打开BSC控制台,点击新建作业按钮,新建一个FLINK_STREAM/JAR作业如下图示例。

image.png

编辑作业参数

作业开发的富文本编辑框中,配置SPARK jar的参数信息如示例:

-- 函数完整类名
main.class=com.baidu.bce.bsc.demo.flink.Kafka2Bos;
 
-- 完整主类名JAR包的资源名称
main.jar=flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
 
-- 函数参数设置,必须以“main.args.”开头
main.args.bootStrapServer=kafka.gz.baidubce.com:9092;
main.args.topic=topictopictopic;
main.args.groupId=groupgroupgroup;
main.args.certBosUrl=https://bucket.bj.bcebos.com/kafka-key.zip;
main.args.bosSink=bos://bucket/object;
main.args.bosDatatimePattern=yyyy-MM-dd--HH;

引用jar资源

资源引用栏中选择刚才上传的jar包,点击引用;并将资源详情中的资源原名作为作业参数main.jar的参数

image.png

image.png

保存发布作业

依次点击保存、发布按钮,将作业发布到作业运维列表。

4. 运行作业

切换到作业运维的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择从上次作业停止时间点启动

image.png

启动之后,可以看到作业运行日志,但jar作业不支持实时监控的查看

5. 更新作业

如果需要更新作业jar包,需要按照如下步骤执行:

  1. 停止运行中的作业。
  2. 在资源管理列表对相应的jar包发起"新增版本"操作。
  3. 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
  4. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  5. 重启启动作业。

如果仅仅是需要修改作业参数,可以简化步骤为:

  1. 停止运行中的作业。
  2. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  3. 重启启动作业。
上一篇
SQL使用手册
下一篇
Spark自定义JAR作业