百度数据工厂Pingo

    Spark常用配置

    Spark常用配置

    作业提交相关

    主要包含任务提交运行配置和driver、executor环境配置相关,一旦任务启动后,将无法修改这些配置
    作业提交相关配置的配置方式有如下2种:

    • notebook

      在notebook中,可以通过%%configure -f指定配置,详细使用方法请通过%%help查看。
      注意,json内容要放到第二行

      	```
      	%%configure -f   
      	{"queue": "default", "conf": {"spark.app.name": "test", "spark.pyspark.python": "./python27/bin/python"}}
      	```
    • Spark Plugin

      Spark Plugin也支持用户配置Spark配置,用户只需要点击高级选项,将配置填写到配置信息中即可。
      注意:一行写一个配置,配置项和配置值用空格分隔

      image.png

      	```
      	spark.app.name spark_plugin_test  
      	spark.pyspark.python ./python27/bin/python
      	```

    application 提交相关

    • spark.app.name

      Spark Application作业的名称,会显示在日志和history中

    • spark.files

      依赖配置,后面跟","间隔的多个file路径。

      通过files依赖的文件将部署到driver和executor运行目录中,需要读取时,直接从当前目录读取即可

    • spark.submit.pyFiles

      依赖配置,后面跟","间隔的多个pyFile路径。

      通过pyFiles依赖的文件将部署到driver和executor运行目录中,并自动添加到Python的LD_LIBRARY_PATH中,需要引用时,直接from … import即可

    • spark.jars

      依赖配置,后面跟","间隔的多个jar路径。

      通过jars依赖的文件将部署到driver和executor运行目录中,并自动添加到Java的CLASSPATH中,需要引用时,直接import即可

    • spark.pyspark.python

      PySpark运行的python bin文件所在本地路径,同时对dirver和executor生效

    yarn提交相关

    • spark.yarn.stagingDir

      Application提交执行的临时目录,用于存放Application运行相关的jar包

    • spark.yarn.max.executor.failures

      Application内允许executor挂掉的个数

      默认是executor个数*2,最少为3次

    • spark.yarn.dist.archives

      依赖配置,后面跟","间隔的多个压缩包路径。

      通过archives依赖的文件将部署到driver和executor运行目录中,并自动解压,解压后的目录名默认为压缩包名,可通过#定义别名。例如java.tar.gz#java解压后目录名为java

    driver启动特殊配置

    • spark.yarn.appMasterEnv.[EnvironmentVariableName]
      配置Driver运行过程中的环境变量,变量名为EnvironmentVariableName,值为配置的值
    • spark.driver.extraClassPath
      将jar或者目录(均为执行机本地目录)添加到Driver运行的ClassPath中
    • spark.driver.extraJavaOptions
      Driver JVM启动的额外参数,如GC等
    • spark.driver.extraLibraryPath
      Driver JVM启动时加载的特殊库路径,例如.so路径
    • spark.driver.userClassFirst
      Driver加载类时,是否将用户添加的jar放在classpath的最前面,最先加载。一般用于缓解类冲突的问题,但并没有真正解决。

    executor启动特殊配置

    • spark.executorEnv.[EnvironmentVariableName]
      配置Executor运行过程中的环境变量,变量名为EnvironmentVariableName,值为配置的值
    • spark.executor.extraClassPath
      将jar或者目录(均为执行机本地目录)添加到Executor运行的ClassPath中
    • spark.executor.extraJavaOptions
      Executor JVM启动的额外参数,如GC等
    • spark.executor.extraLibraryPath
      Executor JVM启动时加载的特殊库路径,例如.so路径
    • spark.executor.userClassFirst
      Executor加载类时,是否将用户添加的jar放在classpath的最前面,最先加载。一般用于缓解类冲突的问题,但并没有真正解决。

    资源配置相关

    driver和executor资源配置

    • spark.driver.memory
      driver进程使用的内存数(例如5g)
    • spark.driver.memoryOverhead
      driver JVM堆内存的大小,默认为max(384, 0.1 * spark.driver.memory) ,一般用于运行非JVM任务,例如Python、C++代码。
    • spark.driver.cores
      driver程序使用的CPU内核数(一般需要保证memory/cores >= 1) cores个数代表了任务并发个数,对于driver,一般cores不需要调大
    • spark.executor.memory
      executor进程的内存大小(例如5g)
    • spark.exeuctor.memoryOverhead
      executor JVM堆外内存大小,包括非JVM的运行环境
    • spark.executor.cores
      每个executor的core数目。每个core同一时间只能执行一个Task线程。 每个task分配的内存大小是executor-memory/executor-cores,可以按照这个分析每个task所占用的内存大小 一般情况下(memory/cores >= 1g)
    • spark.memory.offHeap.enabled
      是否使用堆外内存运行任务,这部分内存不包含在executor memory中
    • saprk.memory.offHeap.size
      设置堆外内存大小,单位为B,例如5000000000表示5G 在spark.memory.offHeap.enabled设为true时启用
    • spark.python.worker.memory
      Python worker运行的内存大小,默认为512m
    • spark.memoryfraction
      默认0.75,用于存放缓存数据和运行数据,剩余0.25为User Memory,存放用户定义的数据结构和Spark元数据信息 在用户persist大量数据或者shuffle聚合数据量比较大时可以考虑增加该值
    • spark.memory.storageFraction
      默认0.5, storage内存大小,用于存储缓存数据,剩余空间用于execute 在Unified Memory Manage模式下,内存会自动调整,分配storage和execute使用,但是在storage内存不足时,会要回所有分配的内存 在用户shuffle处理数据比较大时可减小该参数

    executor启动资源配置相关

    静态资源配置

    • spark.executor.instances
      Application运行时的executor个数,静态配置executor个数,资源不会进行伸缩。当资源不够时,实际申请到的executor个数会少于该值,一旦资源空闲,会自动补充。 若spark.dynamicAllocation.initialExecutors配置的值小于该值,则用该值替换spark.dynamicAllocation.initialExecutors的值

    application 配置相关

    application 网络配置相关

    • spark.network.timeout
      所有网络交互的超时时间,默认为120s 默认包含一下配置:spark.core.connection.ack.wait.timeout,spark.storage.blockManagerSlaveTimeoutMs,spark.shuffle.io.connectionTimeout,spark.rpc.askTimeout,spark.rpc.lookupTimeout
    • spark.executor.heartbeatInterval
      executor心跳间隔,默认为10s 心跳间隔应该要小于spark.network.timeout
    • spark.rpc.numRetries
      RPC连接失败的重试次数
    • spark.sql.broadcastTimeout
      broadcast数据的超时等待时间,专门用在broadcast join中

    application shuffle配置相关

    • spark.shuffle.spill
      shuffle读取数据时是否允许数据落盘,默认开启
    • spark.shuffle.spill.numElementsForceSpillThreshold
      shuffle读取数据超过阈值时会自动落盘 默认为256000000,单位是B
    • spark.shuffle.spill.compress
      在落盘时是否压缩数据,默认开启
    • spark.shuffle.io.maxRetries
      shuffle时I/O异常自动重试次数,默认为3次

    application并行配置相关

    注意:并行配置不是越大越好的,并行度越大,会导致任务过于分散,大量的时间花在任务调度,而实际计算时间花费较少。

    • spark.default.parallelism
      shuffle时RDD的partition个数,作用于RDD API, 可被RDD API覆盖 该配置将影响最终输出文件个数
    • spark.sql.shuffle.partitions
      shuffle时RDD的partition个数,作用于Dataset API/SQL查询,可被coalesceNum和Repartition覆盖 该配置将影响最终输出文件个数

    application 输出配置相关

    • spark.driver.maxResultSize
      Spark Action操作(例如collect)返回给driver的总的数据大小,默认为1g
    • spark.hadoop.validateOutputSpecs
      在使用SaveAsHadoopFile等RDD输出API时,是否允许数据已存在。如果设置为true,则会自动覆盖输出目录,否则目录存在时会抛异常。
    • spark.sql.output.merge
      是否开启最后stage的merge,只能作用于写入table操作,不能作用于其他类型的任务
    • spark.sql.output.coalesceNum
      设置最终输出的文件个数,如果少于最后的partition个数,会另起shuffle和新的stage用来重新拆分数据

    application 运行配置相关

    • spark.sql.catalogImplementation
      配置[hive/in-memory]作为metastore,默认in-memory。这个配置只能在写DataFrame时使用,SQL查询时默认配置hive且不允许修改。
      在写非SQL代码时,SparkSession初始化时默认是用in-memory,不会加载hive-site.xml,如果需要访问元数据,需要在初始化时添加enableHiveSupport,设置metastore为hive。 若先创建SparkContext,再创建SparkSession,则enableHiveSupport并不会起作用,需要在初始化SparkContext时添加spark.sql.catalogImplementation=hive。
    • spark.speculation
      是否开启Spark预测执行 开启后,当一个stage的任务task完成度超过75%,且部分task严重超出平均task耗时时,会自动启动新的task attemp并行跑任务,有效缓解慢节点的问题。Spark能够自动识别task,一旦有一个task完成,重试的task会自动失败。
    • spark.hadoop.outputCommitCoordination.enabled
      开启spark.speculation后必须打开 用来保证task重试时多个task只会有一个有效输出,不会产生冗余文件
    • spark.serializer
      Spark在网络传输和缓存数据时使用的序列化类,默认org.apache.spark.serializer. JavaSerializer,推荐org.apache.spark.serializer.KryoSerializer
    • spark.extraListeners
      逗号分隔的SparkListener用户具体实现类List
    • spark.broadcast.blockSize
      broadcast时每个block的大小,默认4m
    • spark.io.compression.codec
      Spark内部数据(RDD、日志等)的编码器,默认lz4,推荐使用 可选配置:lz4, lzf, snappy, and zstd
    • spark.stage.maxConsecutiveAttempts
      stage连续重试最大次数,默认4 此配置配置的是连续重试次数,一般发生于上游数据获取不到,通过DAG重启上游任务重新计算时。
    • spark.files.maxPartitionBytes
      读取文件时一个partition读取的最大字节数,单位是B,默认134217728 (即128 MB)
    • spark.scheduler.mode
      SparkContext调度job的模式,默认为FIFO(先入先出),也可以配置为FAIR(公平调度,会存在资源竞争),推荐使用FAIR
    • spark.task.maxFailures
      一个task的最大重试次数,默认为4,一旦超过会导致Application失败
    • spark.sql.autoBroadcastJoinThreshold
      Spark自动触发BroadcastJoin的阈值,默认10485760 (10 MB) 在小表join大表时,如果小表数据没有超过阈值,会自动使用BroadcastJoin(类似Hive的MapJoin)
    • spark.sql.jsonGenerator.struct.ignore.null
      Spark内部生成Json数据时,是否忽略struct类型的空数据。 若为true,则可能使用DataSet.toJson获取到的数据列与实际不符;若为false,则空数据会自动注为null
    • spark.sql.session.timeZone
      设置Spark运行的默认时区,在事件处理中比较重要

    Structured Streaming配置相关

    • spark.sql.streaming.checkpointLocation
      StructuredStreaming checkpoint的地址。
      若用户通过API配置了option("checkpointLocation",pathA),会优先使用pathA,重启后路径一致,可保证不丢不重;若用户通过API配置了queryName("xxx"),会使用checkpointLocation+queryName作为checkpoint路径,重启后路径一致,可保证不丢不重;若用户未配置queryName,实际路径为checkpointLocation+uuid,重启后路径不一致,不保证不丢不重,仅用于测试

    界面相关配置

    运行中界面相关配置

    • spark.ui.retainedJobs
      运行中Jobs页面展示的job个数,默认1000
    • spark.ui.retainedStages
      运行中stages页面展示的stage个数,默认1000

    History配置相关

    • spark.eventLog.enable
      是否记录Spark events。若为true,可通过history server查看application的历史运行记录,建议开启
    • spark.exentLog.compress
      是否压缩eventLog,默认false,建议开启
    • spark.history.ui.port
      history server的端口配置
    • spark.yarn.historyServer.address
      history server的地址

    完整配置列表请参见spark官方文档

    https://spark.apache.org/docs/2.3.3/configuration.html

    Spark Python API

    https://spark.apache.org/docs/2.3.3/api/python/

    Spark Scala API

    https://spark.apache.org/docs/2.3.3/api/scala/

    Spark快速入门

    https://spark.apache.org/docs/2.3.3/quick-start.html

    上一篇
    SQL参考手册
    下一篇
    常见问题