Spark自定义JAR作业
所有文档
menu

百度流式计算 BSC

Spark自定义JAR作业

产品详情

背景

BSC 产品支持用户提交SPARK自定义jar作业,以读KAFKA写BOS为例,其具体步骤如下:

步骤

1. 开发作业

开发环境

使用IDEA进行开发,项目管理使用maven,相关版本为

名称 版本
java 1.8
scala 2.11
spark 2.4.6

项目结构

整体项目结构如下图所示。由于示例中KAFKA使用SSL协议,根据代码逻辑将KAFKA提供的SSL证书以及对应kafka配置文件都放在resource下一起打包到jar中。

image.png

pom文件

pom文件如下所示,相关的注意事项见文件注释

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--  指定相关依赖的版本号  -->
    <properties>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.6</spark.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scope.setting>provided</scope.setting>
    </properties>

    <dependencies>
        <!--  1、 bsc运行环境中包含 spark 核心依赖,所以下面涉及到的 spark 核心依赖无需打到项目jar中,
              在打包的时候需要指定scope为provided
        -->
        <!--  spark 核心依赖:spark-sql  -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>
        <!--  spark 核心依赖:spark-streaming  -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>


        <!--  2、 bsc运行环境中不包含 spark connector依赖,所以下面涉及到的 spark connector 依赖需要打到项目jar中。
                注意:
                   - bos相关的依赖无须额外引用,
                   - kafka必须使用0.10版本的connector和client
        -->
        <!--  spark对接kafka streaming相关依赖,需要打到jar中 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--  spark对接kafka sql相关依赖,需要打到jar中 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!--  3、 下面可以引用 spark 之外的一些依赖  -->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!--  scala编译插件  -->
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--  按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

demo代码

读kafka写bos的代码如下:

package com.baidu.bce.bsc.demo.spark

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.{File, FileOutputStream, InputStream}
import java.util.{Base64, Properties}

/**
 * 使用SPARK DataStream接口完成读kafka,写bos的jar作业示例demo
 * 示例中:
 *   - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件解析能力,读取证书和配置
 *   - 从kafka读取的数据为json格式,需要用户自行指定schema
 *   - 写bos需要用户提供永久AK/SK,并指定bosEndpoint和bosSinkPath
 */
object Kafka2Bos {

  def className = {
    this.getClass.getName.stripSuffix("$")
  }

  // 启动日志记录器
  def logger = {
    org.slf4j.LoggerFactory.getLogger(className)
  }

  // 为抽取jar中的文件设置常量
  val keyStoreName = "client.keystore.jks"
  val trustStoreName =  "client.truststore.jks"
  val keyDir = "/kafka-key/"

  def main(args: Array[String]) {

    // 1. 获取参数
    /**
     * args(0) 为BSC为用户指定的checkpoint目录,无法更改。
     * 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。
     */
    val checkpointLocation = args(0)
    /**
     * args(1) 为用户提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用
     * 参数格式:
     *      key1=value1
     *      key2=value2
     * 本代码中示例:
     *     bootStrapServer=kafka.bj.baidubce.com:9092
     *     topic=test
     *     bosEndpoint=https://bj.bcebos.com
     *     ...
     */
    var variables: Map[String, String] = Map()
    try {
      // 解析参数
      val kvStr = new String(Base64.getDecoder().decode(args(1)))
      kvStr.split("\n").foreach { kv =>
        val variable = kv.split("=")
        variables += (variable(0) -> variable(1))
      }
    } catch {
      case e: Exception =>
        logger.error("decode job variables failed", e)
        throw e
    }
    /** 为配置参数赋值 */
    val sourceBootstrapServer = variables("bootStrapServer")
    val sourceTopic = variables("topic")
    val sinkBosEndpoint = variables("bosEndpoint")
    val userAK = variables("bosUserAK")
    val userSK = variables("bosUserSK")
    val bosSink = variables("bosSink")

    // 2. 从资源文件获取kafka的SSL协议client配置
    val sslProp: Properties = new Properties()
    var in: InputStream = null
    try {
      in = this.getClass.getResourceAsStream("/kafka-key/client.properties")
      sslProp.load(in)
    } finally {
      if (in != null) {
        in.close
      }
    }
    /** 打印client配置内容 */
    sslProp.list(System.out)

    // 3. 从jar中抽取 client.keystore.jks & client.truststore.jks
    val currentDir = System.getProperty("user.dir")
    val keyStoreDist = currentDir + File.separator + keyStoreName
    val trustStoreDist = currentDir + File.separator + trustStoreName
    extractFileFromJar(keyStoreDist, keyDir + keyStoreName)
    extractFileFromJar(trustStoreDist, keyDir + trustStoreName)

    // 4. create SparkSession 以及一些基本配置
    val spark = SparkSession.builder
      .appName("KafkaToBosJob")
      .config("spark.sql.shuffle.partitions", "5")
      .getOrCreate()
    /** add cert file */
    spark.sparkContext.addFile(keyStoreDist)
    spark.sparkContext.addFile(trustStoreDist)

    import spark.implicits._

    // 5. 创建 kafka source
    val source = createKafkaSource(spark, sourceBootstrapServer, sourceTopic, sslProp)

    // 6. spark operation pipeline
    /**
     * 操作value中的数据字段
     * 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。
     * 以json格式为例
     * {
     *    "stringtype": "lRAhSQgShKn77uD",
     *    "longtype": 1199158871,
     *    "floattype": 0.038981155578358462,
     *    "binarytype": "null",
     *    "integertype": 1,
     *    "bytetype": -58,
     *    "booleantype": true,
     *    "doubletype": 439147658,
     *    "shorttype": 13538
     *  }
     */
    /** 对于json格式,我们使用schema来定义结构 */
    val schema = new StructType().add("stringtype", StringType).add("longtype", LongType)
      .add("floattype", FloatType).add("binarytype", BinaryType)
      .add("integertype", IntegerType).add("bytetype", ByteType)
      .add("booleantype", BooleanType).add("doubletype", DoubleType)
      .add("shorttype", ShortType)
    /** 将kafka record中的value字段(实际数据)取出来转换成schema,并取出stringtype、longtype、floattype三个字段 */
    val kafkaStreamDF = source
      .selectExpr("CAST(value as STRING)")
      .select(from_json($"value", schema).alias("json_data"))
      .select($"json_data.stringtype", $"json_data.longtype", $"json_data.floattype")

    // 7. 将操作过后的数据以csv格式写入bos
    /** 配置baidu bos file system参数 */
    val hadoopConf = spark.sparkContext.hadoopConfiguration
    hadoopConf.set("fs.bos.endpoint", s"${sinkBosEndpoint}")
    hadoopConf.set("fs.bos.access.key", s"${userAK}")
    hadoopConf.set("fs.bos.secret.access.key", s"${userSK}")
    hadoopConf.set("fs.bos.impl", "org.apache.hadoop.fs.bos.BaiduBosFileSystem")
    hadoopConf.setBoolean("iam.sts.enabled", false)
    hadoopConf.setLong("fs.bos.readahead.size", 5242880)
    val sink=kafkaStreamDF.writeStream
      .format("csv")	//sink的类型,必填
      .outputMode("append")	//输出模式
      .option("path", s"${bosSink}")
      .option("truncate", "false")
      .option("checkpointLocation", s"${checkpointLocation}") //从args(0)指定checkpoint位置

    val query = sink.start()
    query.awaitTermination()
  } // end main


  /**
   * 创建kafka source,设置基本配置参数,以及ssl配置
   * @param spark
   * @param sourceBootstrapServer
   * @param topic
   * @param sslProp
   * @return
   */
  def createKafkaSource(
      spark: SparkSession,
      sourceBootstrapServer: String,
      topic: String,
      sslProp: Properties): DataFrame = {
    val source = spark.readStream
      .format("kafka")	//source的类型,必填
      .option("kafka.bootstrap.servers", s"${sourceBootstrapServer}")	//endpoint和端口,必填
      .option("subscribe", s"${topic}")	//topic,必填
      .option("startingOffsets", "earliest")	//读取offset的起始位置,latest/earliest,必填

    // set sslProp to source option
    var iter = sslProp.stringPropertyNames.iterator()
    while (iter.hasNext) {
      val key = iter.next()
      source.option(s"kafka.$key", sslProp.getProperty(key))
    }
    source.load()
  }

  /**
   * 从jar中抽取文件到本地
   * @param dist
   * @param source
   */
  def extractFileFromJar(dist: String, source: String) = {
    try {
      val inStream = this.getClass.getResourceAsStream(source)

      val fos = new FileOutputStream(dist);
      val buffer = new Array[Byte](10240);
      var size: Int = 0;
      while (size != -1) {
        fos.write(buffer, 0, size);
        size = inStream.read(buffer); //Scala中这个Size的位置很重要
      }
      inStream.close();
      fos.close();
    } catch {
      case t: Throwable =>
        t.printStackTrace
        throw t
    }
  }
}

项目打包

执行 mvn clean package 命令之后,能编译出用于bsc运行的jar包 spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

image.png

2. 新增资源

进入BSC控制台,选择资源管理,点击新增资源按钮。资源类型需要选择为JOB_FILE/JAR,上传方式与jar包大小相关,可以选择bos上传或者本地上传。

image.png

上传完成之后效果如下:

image.png

3. 新增作业

打开BSC控制台,点击新建作业按钮,新建一个SPARK_STREAM/JAR作业如下图示例。

image.png

编辑作业参数

作业开发的富文本编辑框中,配置SPARK jar的参数信息如示例:

-- 函数完整类名
main.class=com.baidu.bce.bsc.demo.spark.Kafka2Bos;
 
-- 完整主类名JAR包的资源名称
main.jar=spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar; 
 
-- 函数参数设置,必须以“main.args.”开头
main.args.bootStrapServer=kafka.gz.baidubce.com:9092;
main.args.topic=topictopictopic;
main.args.bosEndpoint=http://gz.bcebos.com;
main.args.bosUserAK=akakakakakakakakakakakakak;
main.args.bosUserSK=sksksksksksksksksksksksksk;
main.args.bosSink=bos://bucket/object;

引用jar资源

资源引用栏中选择刚才上传的jar包,点击引用;并将资源详情中的资源原名作为作业参数main.jar的参数

image.png

image.png

保存发布作业

依次点击保存、发布按钮,将作业发布到作业运维列表。

4. 运行作业

切换到作业运维的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择从上次作业停止时间点启动

image.png

启动之后,可以看到作业运行日志,但jar作业不支持实时监控的查看

5. 更新作业

如果需要更新作业jar包,需要按照如下步骤执行:

  1. 停止运行中的作业。
  2. 在资源管理列表对相应的jar包发起"新增版本"操作。
  3. 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
  4. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  5. 重启启动作业。

如果仅仅是需要修改作业参数,可以简化步骤为:

  1. 停止运行中的作业。
  2. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  3. 重启启动作业。
上一篇
Flink自定义JAR作业
下一篇
常见问题