Spark自定义JAR作业
更新时间:2024-01-04
背景
BSC 产品支持用户提交SPARK自定义jar作业,以读KAFKA写BOS为例,其具体步骤如下:
步骤
1. 开发作业
开发环境
使用IDEA进行开发,项目管理使用maven,相关版本为
名称 | 版本 |
---|---|
java | 1.8 |
scala | 2.11 |
spark | 2.4.6 |
项目结构
整体项目结构如下图所示。由于示例中KAFKA使用SSL协议,根据代码逻辑将KAFKA提供的SSL证书以及对应kafka配置文件都放在resource下一起打包到jar中。
pom文件
pom文件如下所示,相关的注意事项见文件注释
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 指定相关依赖的版本号 -->
<properties>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.6</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scope.setting>provided</scope.setting>
</properties>
<dependencies>
<!-- 1、 bsc运行环境中包含 spark 核心依赖,所以下面涉及到的 spark 核心依赖无需打到项目jar中,
在打包的时候需要指定scope为provided
-->
<!-- spark 核心依赖:spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${scope.setting}</scope>
</dependency>
<!-- spark 核心依赖:spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${scope.setting}</scope>
</dependency>
<!-- 2、 bsc运行环境中不包含 spark connector依赖,所以下面涉及到的 spark connector 依赖需要打到项目jar中。
注意:
- bos相关的依赖无须额外引用,
- kafka必须使用0.10版本的connector和client
-->
<!-- spark对接kafka streaming相关依赖,需要打到jar中 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark对接kafka sql相关依赖,需要打到jar中 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 3、 下面可以引用 spark 之外的一些依赖 -->
</dependencies>
<build>
<plugins>
<plugin>
<!-- scala编译插件 -->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
demo代码
读kafka写bos的代码如下:
package com.baidu.bce.bsc.demo.spark
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.{File, FileOutputStream, InputStream}
import java.util.{Base64, Properties}
/**
* 使用SPARK DataStream接口完成读kafka,写bos的jar作业示例demo
* 示例中:
* - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件解析能力,读取证书和配置
* - 从kafka读取的数据为json格式,需要用户自行指定schema
* - 写bos需要用户提供永久AK/SK,并指定bosEndpoint和bosSinkPath
*/
object Kafka2Bos {
def className = {
this.getClass.getName.stripSuffix("$")
}
// 启动日志记录器
def logger = {
org.slf4j.LoggerFactory.getLogger(className)
}
// 为抽取jar中的文件设置常量
val keyStoreName = "client.keystore.jks"
val trustStoreName = "client.truststore.jks"
val keyDir = "/kafka-key/"
def main(args: Array[String]) {
// 1. 获取参数
/**
* args(0) 为BSC为用户指定的checkpoint目录,无法更改。
* 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。
*/
val checkpointLocation = args(0)
/**
* args(1) 为用户提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用
* 参数格式:
* key1=value1
* key2=value2
* 本代码中示例:
* bootStrapServer=kafka.bj.baidubce.com:9092
* topic=test
* bosEndpoint=https://bj.bcebos.com
* ...
*/
var variables: Map[String, String] = Map()
try {
// 解析参数
val kvStr = new String(Base64.getDecoder().decode(args(1)))
kvStr.split("\n").foreach { kv =>
val variable = kv.split("=")
variables += (variable(0) -> variable(1))
}
} catch {
case e: Exception =>
logger.error("decode job variables failed", e)
throw e
}
/** 为配置参数赋值 */
val sourceBootstrapServer = variables("bootStrapServer")
val sourceTopic = variables("topic")
val sinkBosEndpoint = variables("bosEndpoint")
val userAK = variables("bosUserAK")
val userSK = variables("bosUserSK")
val bosSink = variables("bosSink")
// 2. 从资源文件获取kafka的SSL协议client配置
val sslProp: Properties = new Properties()
var in: InputStream = null
try {
in = this.getClass.getResourceAsStream("/kafka-key/client.properties")
sslProp.load(in)
} finally {
if (in != null) {
in.close
}
}
/** 打印client配置内容 */
sslProp.list(System.out)
// 3. 从jar中抽取 client.keystore.jks & client.truststore.jks
val currentDir = System.getProperty("user.dir")
val keyStoreDist = currentDir + File.separator + keyStoreName
val trustStoreDist = currentDir + File.separator + trustStoreName
extractFileFromJar(keyStoreDist, keyDir + keyStoreName)
extractFileFromJar(trustStoreDist, keyDir + trustStoreName)
// 4. create SparkSession 以及一些基本配置
val spark = SparkSession.builder
.appName("KafkaToBosJob")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
/** add cert file */
spark.sparkContext.addFile(keyStoreDist)
spark.sparkContext.addFile(trustStoreDist)
import spark.implicits._
// 5. 创建 kafka source
val source = createKafkaSource(spark, sourceBootstrapServer, sourceTopic, sslProp)
// 6. spark operation pipeline
/**
* 操作value中的数据字段
* 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。
* 以json格式为例
* {
* "stringtype": "lRAhSQgShKn77uD",
* "longtype": 1199158871,
* "floattype": 0.038981155578358462,
* "binarytype": "null",
* "integertype": 1,
* "bytetype": -58,
* "booleantype": true,
* "doubletype": 439147658,
* "shorttype": 13538
* }
*/
/** 对于json格式,我们使用schema来定义结构 */
val schema = new StructType().add("stringtype", StringType).add("longtype", LongType)
.add("floattype", FloatType).add("binarytype", BinaryType)
.add("integertype", IntegerType).add("bytetype", ByteType)
.add("booleantype", BooleanType).add("doubletype", DoubleType)
.add("shorttype", ShortType)
/** 将kafka record中的value字段(实际数据)取出来转换成schema,并取出stringtype、longtype、floattype三个字段 */
val kafkaStreamDF = source
.selectExpr("CAST(value as STRING)")
.select(from_json($"value", schema).alias("json_data"))
.select($"json_data.stringtype", $"json_data.longtype", $"json_data.floattype")
// 7. 将操作过后的数据以csv格式写入bos
/** 配置baidu bos file system参数 */
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.set("fs.bos.endpoint", s"${sinkBosEndpoint}")
hadoopConf.set("fs.bos.access.key", s"${userAK}")
hadoopConf.set("fs.bos.secret.access.key", s"${userSK}")
hadoopConf.set("fs.bos.impl", "org.apache.hadoop.fs.bos.BaiduBosFileSystem")
hadoopConf.setBoolean("iam.sts.enabled", false)
hadoopConf.setLong("fs.bos.readahead.size", 5242880)
val sink=kafkaStreamDF.writeStream
.format("csv") //sink的类型,必填
.outputMode("append") //输出模式
.option("path", s"${bosSink}")
.option("truncate", "false")
.option("checkpointLocation", s"${checkpointLocation}") //从args(0)指定checkpoint位置
val query = sink.start()
query.awaitTermination()
} // end main
/**
* 创建kafka source,设置基本配置参数,以及ssl配置
* @param spark
* @param sourceBootstrapServer
* @param topic
* @param sslProp
* @return
*/
def createKafkaSource(
spark: SparkSession,
sourceBootstrapServer: String,
topic: String,
sslProp: Properties): DataFrame = {
val source = spark.readStream
.format("kafka") //source的类型,必填
.option("kafka.bootstrap.servers", s"${sourceBootstrapServer}") //endpoint和端口,必填
.option("subscribe", s"${topic}") //topic,必填
.option("startingOffsets", "earliest") //读取offset的起始位置,latest/earliest,必填
// set sslProp to source option
var iter = sslProp.stringPropertyNames.iterator()
while (iter.hasNext) {
val key = iter.next()
source.option(s"kafka.$key", sslProp.getProperty(key))
}
source.load()
}
/**
* 从jar中抽取文件到本地
* @param dist
* @param source
*/
def extractFileFromJar(dist: String, source: String) = {
try {
val inStream = this.getClass.getResourceAsStream(source)
val fos = new FileOutputStream(dist);
val buffer = new Array[Byte](10240);
var size: Int = 0;
while (size != -1) {
fos.write(buffer, 0, size);
size = inStream.read(buffer); //Scala中这个Size的位置很重要
}
inStream.close();
fos.close();
} catch {
case t: Throwable =>
t.printStackTrace
throw t
}
}
}
项目打包
执行 mvn clean package
命令之后,能编译出用于bsc运行的jar包 spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
2. 新增资源
进入BSC控制台,选择资源管理,点击新增资源按钮。资源类型需要选择为JOB_FILE/JAR,上传方式与jar包大小相关,可以选择bos上传或者本地上传。
上传完成之后效果如下:
3. 新增作业
打开BSC控制台,点击新建作业按钮,新建一个SPARK_STREAM/JAR作业如下图示例。
编辑作业参数
在作业开发的富文本编辑框中,配置SPARK jar的参数信息如示例:
-- 函数完整类名
main.class=com.baidu.bce.bsc.demo.spark.Kafka2Bos;
-- 完整主类名JAR包的资源名称
main.jar=spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
-- 函数参数设置,必须以“main.args.”开头
main.args.bootStrapServer=kafka.gz.baidubce.com:9092;
main.args.topic=topictopictopic;
main.args.bosEndpoint=http://gz.bcebos.com;
main.args.bosUserAK=akakakakakakakakakakakakak;
main.args.bosUserSK=sksksksksksksksksksksksksk;
main.args.bosSink=bos://bucket/object;
引用jar资源
在资源引用栏中选择刚才上传的jar包,点击引用;并将资源详情中的资源原名作为作业参数main.jar的参数
保存发布作业
依次点击保存、发布按钮,将作业发布到作业运维列表。
4. 运行作业
切换到作业运维的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择从上次作业停止时间点启动
启动之后,可以看到作业运行日志,但jar作业不支持实时监控的查看。
5. 更新作业
如果需要更新作业jar包,需要按照如下步骤执行:
- 停止运行中的作业。
- 在资源管理列表对相应的jar包发起"新增版本"操作。
- 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
- 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
- 重启启动作业。
如果仅仅是需要修改作业参数,可以简化步骤为:
- 停止运行中的作业。
- 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
- 重启启动作业。