Spark任务开发手册
更新时间:2026-07-03
本文档用于指导开发者在百度胜算平台开发、运行 PySpark 任务,覆盖 Spark 配置生成、业务脚本编写、任务提交运行全流程。通过平台内置工具可自动生成鉴权、存储关联配置,规避手动配置 AK/SK、集群参数的繁琐操作,适用于元数据查询、数据表读写、Volume 存储文件读写等常见 Spark 开发场景。
步骤一:生成Spark配置文件
因为需要使用到 aksk 等信息访问存储、鉴权等服务,对用户手动执行过于繁琐,因此通过执行以下平台内置命令,自动生成 spark-defaults.conf 配置文件:
Bash
1databuilder spark confgen
生成的 conf 文件默认在 /opt/spark/conf/spark-defaults.conf 位置。您也可以通过 --output-dir 修改保存位置。
步骤二:PySpark脚本开发
配置文件生成完成后,您可基于 PySpark 编写业务脚本,支持元数据表读写、Volume 存储空间文件读写两大核心场景,以下为可直接复用的标准示例脚本。以访问元数据的 PySpark 为例:
- 访问元数据表
Python
1import sys
2import uuid
3import pyspark
4from pyspark.sql import SparkSession
5
6def main():
7 spark = SparkSession.builder.appName("PySparkSample").getOrCreate()
8
9 log4jLogger = spark._jvm.org.apache.log4j
10 logger = log4jLogger.LogManager.getLogger(__name__)
11 logger.info("start to run pyspark")
12 try:
13 df = spark.createDataFrame([(14, "Tom"), (23, "Alice"), (16, "Bob")], ["da", "val"])
14 df.writeTo("testcatalog.default.testtab").append()
15
16 df = spark.table("testcatalog.default.testtab")
17 df.show(100)
18 spark.stop()
19 except Exception as e:
20 logger.info(f"test try catch: 发生错误: {e}")
21 raise
22
23if __name__ == "__main__":
24 main()
- 访问volume
Python
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder.appName("PySparkSample").getOrCreate()
4
5df = spark.sql("""select 'hello' as col1, 'world' as col2, '!' as col3""")
6
7file_path = "/Volumes/zhengyafei/default/test"
8
9df.write.format("csv").mode("overwrite").option("header", "true").option("sep", ",").save(file_path)
10
11df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("sep", ",").load(file_path)
12
13df.show(5)
14
15df = spark.sql("""SELECT * FROM csv.`/Volumes/zhengyafei/default/test`""")
16
17df.show(5)
18spark.stop()
使用说明:使用时需将
/Volumes/zhengyafei/default/test替换为业务实际的 Volume 存储路径。
Spark任务提交运行
脚本编写完成后,通过平台标准 spark-submit 命令提交任务,执行正式运行。将脚本文件命名为 test.py(可自定义名称),在终端执行以下命令:
Python
1/opt/spark/bin/spark-submit test.py
评价此篇文章
