Spark常用配置
所有文档

          百度数据工厂 Pingo

          Spark常用配置

          Spark常用配置

          作业提交相关

          主要包含任务提交运行配置和driver、executor环境配置相关,一旦任务启动后,将无法修改这些配置
          作业提交相关配置的配置方式有如下2种:

          • notebook

            在notebook中,可以通过%%configure -f指定配置,详细使用方法请通过%%help查看。
            注意,json内容要放到第二行

            	```
            	%%configure -f   
            	{"queue": "default", "conf": {"spark.app.name": "test", "spark.pyspark.python": "./python27/bin/python"}}
            	```
          • Spark Plugin

            Spark Plugin也支持用户配置Spark配置,用户只需要点击高级选项,将配置填写到配置信息中即可。
            注意:一行写一个配置,配置项和配置值用空格分隔

            image.png

            	```
            	spark.app.name spark_plugin_test  
            	spark.pyspark.python ./python27/bin/python
            	```

          application 提交相关

          • spark.app.name

            Spark Application作业的名称,会显示在日志和history中

          • spark.files

            依赖配置,后面跟","间隔的多个file路径。

            通过files依赖的文件将部署到driver和executor运行目录中,需要读取时,直接从当前目录读取即可

          • spark.submit.pyFiles

            依赖配置,后面跟","间隔的多个pyFile路径。

            通过pyFiles依赖的文件将部署到driver和executor运行目录中,并自动添加到Python的LD_LIBRARY_PATH中,需要引用时,直接from … import即可

          • spark.jars

            依赖配置,后面跟","间隔的多个jar路径。

            通过jars依赖的文件将部署到driver和executor运行目录中,并自动添加到Java的CLASSPATH中,需要引用时,直接import即可

          • spark.pyspark.python

            PySpark运行的python bin文件所在本地路径,同时对dirver和executor生效

          yarn提交相关

          • spark.yarn.stagingDir

            Application提交执行的临时目录,用于存放Application运行相关的jar包

          • spark.yarn.max.executor.failures

            Application内允许executor挂掉的个数

            默认是executor个数*2,最少为3次

          • spark.yarn.dist.archives

            依赖配置,后面跟","间隔的多个压缩包路径。

            通过archives依赖的文件将部署到driver和executor运行目录中,并自动解压,解压后的目录名默认为压缩包名,可通过#定义别名。例如java.tar.gz#java解压后目录名为java

          driver启动特殊配置

          • spark.yarn.appMasterEnv.[EnvironmentVariableName]
            配置Driver运行过程中的环境变量,变量名为EnvironmentVariableName,值为配置的值
          • spark.driver.extraClassPath
            将jar或者目录(均为执行机本地目录)添加到Driver运行的ClassPath中
          • spark.driver.extraJavaOptions
            Driver JVM启动的额外参数,如GC等
          • spark.driver.extraLibraryPath
            Driver JVM启动时加载的特殊库路径,例如.so路径
          • spark.driver.userClassFirst
            Driver加载类时,是否将用户添加的jar放在classpath的最前面,最先加载。一般用于缓解类冲突的问题,但并没有真正解决。

          executor启动特殊配置

          • spark.executorEnv.[EnvironmentVariableName]
            配置Executor运行过程中的环境变量,变量名为EnvironmentVariableName,值为配置的值
          • spark.executor.extraClassPath
            将jar或者目录(均为执行机本地目录)添加到Executor运行的ClassPath中
          • spark.executor.extraJavaOptions
            Executor JVM启动的额外参数,如GC等
          • spark.executor.extraLibraryPath
            Executor JVM启动时加载的特殊库路径,例如.so路径
          • spark.executor.userClassFirst
            Executor加载类时,是否将用户添加的jar放在classpath的最前面,最先加载。一般用于缓解类冲突的问题,但并没有真正解决。

          资源配置相关

          driver和executor资源配置

          • spark.driver.memory
            driver进程使用的内存数(例如5g)
          • spark.driver.memoryOverhead
            driver JVM堆内存的大小,默认为max(384, 0.1 * spark.driver.memory) ,一般用于运行非JVM任务,例如Python、C++代码。
          • spark.driver.cores
            driver程序使用的CPU内核数(一般需要保证memory/cores >= 1) cores个数代表了任务并发个数,对于driver,一般cores不需要调大
          • spark.executor.memory
            executor进程的内存大小(例如5g)
          • spark.exeuctor.memoryOverhead
            executor JVM堆外内存大小,包括非JVM的运行环境
          • spark.executor.cores
            每个executor的core数目。每个core同一时间只能执行一个Task线程。 每个task分配的内存大小是executor-memory/executor-cores,可以按照这个分析每个task所占用的内存大小 一般情况下(memory/cores >= 1g)
          • spark.memory.offHeap.enabled
            是否使用堆外内存运行任务,这部分内存不包含在executor memory中
          • saprk.memory.offHeap.size
            设置堆外内存大小,单位为B,例如5000000000表示5G 在spark.memory.offHeap.enabled设为true时启用
          • spark.python.worker.memory
            Python worker运行的内存大小,默认为512m
          • spark.memoryfraction
            默认0.75,用于存放缓存数据和运行数据,剩余0.25为User Memory,存放用户定义的数据结构和Spark元数据信息 在用户persist大量数据或者shuffle聚合数据量比较大时可以考虑增加该值
          • spark.memory.storageFraction
            默认0.5, storage内存大小,用于存储缓存数据,剩余空间用于execute 在Unified Memory Manage模式下,内存会自动调整,分配storage和execute使用,但是在storage内存不足时,会要回所有分配的内存 在用户shuffle处理数据比较大时可减小该参数

          executor启动资源配置相关

          静态资源配置

          • spark.executor.instances
            Application运行时的executor个数,静态配置executor个数,资源不会进行伸缩。当资源不够时,实际申请到的executor个数会少于该值,一旦资源空闲,会自动补充。 若spark.dynamicAllocation.initialExecutors配置的值小于该值,则用该值替换spark.dynamicAllocation.initialExecutors的值

          application 配置相关

          application 网络配置相关

          • spark.network.timeout
            所有网络交互的超时时间,默认为120s 默认包含一下配置:spark.core.connection.ack.wait.timeout,spark.storage.blockManagerSlaveTimeoutMs,spark.shuffle.io.connectionTimeout,spark.rpc.askTimeout,spark.rpc.lookupTimeout
          • spark.executor.heartbeatInterval
            executor心跳间隔,默认为10s 心跳间隔应该要小于spark.network.timeout
          • spark.rpc.numRetries
            RPC连接失败的重试次数
          • spark.sql.broadcastTimeout
            broadcast数据的超时等待时间,专门用在broadcast join中

          application shuffle配置相关

          • spark.shuffle.spill
            shuffle读取数据时是否允许数据落盘,默认开启
          • spark.shuffle.spill.numElementsForceSpillThreshold
            shuffle读取数据超过阈值时会自动落盘 默认为256000000,单位是B
          • spark.shuffle.spill.compress
            在落盘时是否压缩数据,默认开启
          • spark.shuffle.io.maxRetries
            shuffle时I/O异常自动重试次数,默认为3次

          application并行配置相关

          注意:并行配置不是越大越好的,并行度越大,会导致任务过于分散,大量的时间花在任务调度,而实际计算时间花费较少。

          • spark.default.parallelism
            shuffle时RDD的partition个数,作用于RDD API, 可被RDD API覆盖 该配置将影响最终输出文件个数
          • spark.sql.shuffle.partitions
            shuffle时RDD的partition个数,作用于Dataset API/SQL查询,可被coalesceNum和Repartition覆盖 该配置将影响最终输出文件个数

          application 输出配置相关

          • spark.driver.maxResultSize
            Spark Action操作(例如collect)返回给driver的总的数据大小,默认为1g
          • spark.hadoop.validateOutputSpecs
            在使用SaveAsHadoopFile等RDD输出API时,是否允许数据已存在。如果设置为true,则会自动覆盖输出目录,否则目录存在时会抛异常。
          • spark.sql.output.merge
            是否开启最后stage的merge,只能作用于写入table操作,不能作用于其他类型的任务
          • spark.sql.output.coalesceNum
            设置最终输出的文件个数,如果少于最后的partition个数,会另起shuffle和新的stage用来重新拆分数据

          application 运行配置相关

          • spark.sql.catalogImplementation
            配置[hive/in-memory]作为metastore,默认in-memory。这个配置只能在写DataFrame时使用,SQL查询时默认配置hive且不允许修改。
            在写非SQL代码时,SparkSession初始化时默认是用in-memory,不会加载hive-site.xml,如果需要访问元数据,需要在初始化时添加enableHiveSupport,设置metastore为hive。 若先创建SparkContext,再创建SparkSession,则enableHiveSupport并不会起作用,需要在初始化SparkContext时添加spark.sql.catalogImplementation=hive。
          • spark.speculation
            是否开启Spark预测执行 开启后,当一个stage的任务task完成度超过75%,且部分task严重超出平均task耗时时,会自动启动新的task attemp并行跑任务,有效缓解慢节点的问题。Spark能够自动识别task,一旦有一个task完成,重试的task会自动失败。
          • spark.hadoop.outputCommitCoordination.enabled
            开启spark.speculation后必须打开 用来保证task重试时多个task只会有一个有效输出,不会产生冗余文件
          • spark.serializer
            Spark在网络传输和缓存数据时使用的序列化类,默认org.apache.spark.serializer. JavaSerializer,推荐org.apache.spark.serializer.KryoSerializer
          • spark.extraListeners
            逗号分隔的SparkListener用户具体实现类List
          • spark.broadcast.blockSize
            broadcast时每个block的大小,默认4m
          • spark.io.compression.codec
            Spark内部数据(RDD、日志等)的编码器,默认lz4,推荐使用 可选配置:lz4, lzf, snappy, and zstd
          • spark.stage.maxConsecutiveAttempts
            stage连续重试最大次数,默认4 此配置配置的是连续重试次数,一般发生于上游数据获取不到,通过DAG重启上游任务重新计算时。
          • spark.files.maxPartitionBytes
            读取文件时一个partition读取的最大字节数,单位是B,默认134217728 (即128 MB)
          • spark.scheduler.mode
            SparkContext调度job的模式,默认为FIFO(先入先出),也可以配置为FAIR(公平调度,会存在资源竞争),推荐使用FAIR
          • spark.task.maxFailures
            一个task的最大重试次数,默认为4,一旦超过会导致Application失败
          • spark.sql.autoBroadcastJoinThreshold
            Spark自动触发BroadcastJoin的阈值,默认10485760 (10 MB) 在小表join大表时,如果小表数据没有超过阈值,会自动使用BroadcastJoin(类似Hive的MapJoin)
          • spark.sql.jsonGenerator.struct.ignore.null
            Spark内部生成Json数据时,是否忽略struct类型的空数据。 若为true,则可能使用DataSet.toJson获取到的数据列与实际不符;若为false,则空数据会自动注为null
          • spark.sql.session.timeZone
            设置Spark运行的默认时区,在事件处理中比较重要

          Structured Streaming配置相关

          • spark.sql.streaming.checkpointLocation
            StructuredStreaming checkpoint的地址。
            若用户通过API配置了option("checkpointLocation",pathA),会优先使用pathA,重启后路径一致,可保证不丢不重;若用户通过API配置了queryName("xxx"),会使用checkpointLocation+queryName作为checkpoint路径,重启后路径一致,可保证不丢不重;若用户未配置queryName,实际路径为checkpointLocation+uuid,重启后路径不一致,不保证不丢不重,仅用于测试

          界面相关配置

          运行中界面相关配置

          • spark.ui.retainedJobs
            运行中Jobs页面展示的job个数,默认1000
          • spark.ui.retainedStages
            运行中stages页面展示的stage个数,默认1000

          History配置相关

          • spark.eventLog.enable
            是否记录Spark events。若为true,可通过history server查看application的历史运行记录,建议开启
          • spark.exentLog.compress
            是否压缩eventLog,默认false,建议开启
          • spark.history.ui.port
            history server的端口配置
          • spark.yarn.historyServer.address
            history server的地址

          完整配置列表请参见spark官方文档

          https://spark.apache.org/docs/2.3.3/configuration.html

          Spark Python API

          https://spark.apache.org/docs/2.3.3/api/python/

          Spark Scala API

          https://spark.apache.org/docs/2.3.3/api/scala/

          Spark快速入门

          https://spark.apache.org/docs/2.3.3/quick-start.html

          上一篇
          SQL参考手册
          下一篇
          常见问题