Spark
Spark简介
本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用Spark。
Spark是开源的大规模数据处理引擎。Spark的先进的DAG执行引擎支持周期性数据流和内存计算,在内存中的运算速度是MapReduce的100倍以上,在硬盘中的运算速度是MapReduce的10倍以上。Spark提供了Java、Scala、Python和R语言的高水平API,同时Spark已无缝融合了丰富的工具:Spark SQL(SQL)、MLlib(机器学习)、GraphX(图形处理)、Spark Streaming(流式处理)。Spark可访问存储在HDFS、HBase、Cassandra、本地文件系统等上的数据,支持文本文件、序列文件、以及任何Hadoop的输入文件。
Spark提供端到端的服务:
- Spark的Driver包含您的作业程序,完成作业程序的解析和生成;
- Driver向集群的Master节点申请运行作业所需的资源;
- Master节点为作业分配满足要求的Core节点,并在该节点按要求创建Executor;
- Driver将Spark作业的代码和文件传送给分配的Executor;
- Executor运行作业,将结果返回给Driver或写入指定的输出位置。
集群准备
- 准备数据,请参考数据准备。
- 百度智能云环境准备。
-
登录控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页,并做如下配置:
- 设置集群名称
- 设置管理员密码
- 关闭日志开关
- 选择集群版本“BMR 1.6.0”
- 选择集群类型“Hadoop”
- 请保持集群的其他默认配置不变,点击“完成”可在集群列表页可查看已创建的集群。
Spark Java
程序准备
百度智能云提供的Spark样例程序的代码已上传至:https://github.com/BCEBIGDATA/bmr-sample-java,您可通过GitHub克隆代码至本地设计自己的程序,并上传到对象存储BOS(具体操作详见对象存储BOS入门指南)。
/**
* Analyze log with Spark.
*/
public class AccessLogAnalyzer {
private static final SimpleDateFormat logDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US);
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
private static String fetchDate(String gmsTime) {
String date;
try {
date = simpleDateFormat.format(logDateFormat.parse(gmsTime));
} catch (ParseException e) {
date = null;
}
return date;
}
private static final Pattern LOGPATTERN = Pattern.compile(
"(\\S+)\\s+-\\s+\\[(.*?)\\]\\s+\"(.*?)\"\\s+(\\d{3})\\s+(\\S+)\\s+"
+ "\"(.*?)\"\\s+\"(.*?)\"\\s+(.*?)\\s+\"(.*?)\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)");
private static Tuple2<String, String> extractKey(String line) {
Matcher m = LOGPATTERN.matcher(line);
if (m.find()) {
String ipAddr = m.group(1);
String date = fetchDate(m.group(2));
return new Tuple2<>(date, ipAddr);
}
return new Tuple2<>(null, null);
}
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.AccessLogAnalyzer <input> <pv> <uv>");
System.exit(1);
}
SparkConf conf = new SparkConf().setAppName("AccessLogAnalyzer");
JavaSparkContext sc = new JavaSparkContext(conf);
// Parses the log to log records and caches the result.
JavaPairRDD<String, String> distFile = sc.textFile(args[0]).mapToPair(
new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
return extractKey(s);
}
});
distFile.cache();
// Changes the log info to (date, 1) format, and caculates the page view.
JavaPairRDD<String, Integer> pv = distFile.mapToPair(
new PairFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, String> tuple) {
return new Tuple2<>(tuple._1(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Coalesces to 1 partition and saves as file. Notice that this is for demo purpose only.
pv.coalesce(1, true).saveAsTextFile(args[1]);
// Changes the log info to (date, remoteAddr) and caculates the unique visitors.
JavaPairRDD<String, Integer> uv = distFile.groupByKey().mapToPair(
new PairFunction<Tuple2<String, Iterable<String>>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> tuple) {
int size = new HashSet((Collection<?>) tuple._2()).size();
return new Tuple2<>(tuple._1(), size);
}
});
// Coalesces to 1 partition and saves as file. Notice that this is for demo purpose only.
uv.coalesce(1, true).saveAsTextFile(args[2]);
}
}
运行Spark作业
- 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
-
配置Spark作业参数,具体如下:
- 作业类型:选择“Spark作业”。
- 作业名称:输入作业名称,长度不可超过255个字符。
- 应用程序位置:若使用您自行编译的程序,请上传程序jar包至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下:
- 华北-北京区域的集群对应的样例程序路径:
bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar
。 - 华南-广州区域的集群对应的样例程序路径:bos://bmr-public-gz/sample/spark-1.0-SNAPSHOT.jar
。 - 失败后操作:继续。
- Spark-submit:
--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer
- 应用程序参数:指定输入数据的路径、结果输出的路径(可选BOS或HDFS),其中输出路径必须具有写权限且该路径不能已存在。以样例日志作为输入数据,BOS作为输出路径为例,输入如下:
- 华北-北京区域的BMR集群对应的参数:
bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
。 - 华南-广州区域的BMR集群对应的参数:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
。
- 在“集群适配”区,选择适配的集群。
- 点击“完成”,则作业创建完成;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后状态会更新为“已完成”,便可查看到查询结果了。
查看结果
请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:
如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:
------PV------
20151003 139
20151005 372
20151006 114
20151004 375
------UV------
20151003 111
20151005 212
20151006 97
20151004 247
Spark Scala
程序准备
您可以直接使用样例程序。也可设计自己的程序,并在命令行下cd
到程序代码的根目录下,执行mvn package
生成jar文件,并上传到对象存储BOS(具体操作详见对象存储BOS入门指南)。
运行Spark作业
- 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
-
配置Spark作业参数,具体如下:
- 作业类型:选择“Spark作业”。
- 作业名称:输入作业名称,长度不可超过255个字符。
- 应用程序位置:若使用您自行编译的程序,请上传程序jar包至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,仅华北-北京区域的集群可使用,路径为
bos://bmr-public-data/apps/spark/bmr-spark-scala-samples-1.0-SNAPSHOT-jar-with-dependencies.jar
。 - 失败后操作:继续。
- Spark-submit:
--class com.baidubce.bmr.sample.AccessLogStatsScalaSample
- 应用程序参数:指定输入数据的路径、结果输出的路径(可选BOS或HDFS),其中输出路径必须具有写权限且该路径不能已存在。以样例日志作为输入数据,BOS作为输出路径为例,输入如下:
- 华北-北京区域的BMR集群对应的参数:
bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
。 - 华南-广州区域的BMR集群对应的参数:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
。
- 在“集群适配”区,选择适配的集群。
- 点击“完成”,则作业创建完成;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后状态会更新为“已完成”,便可查看到查询结果了。
查看结果
请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:
如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:
-----PV-----
(20151003,139)
(20151005,372)
(20151006,114)
(20151004,375)
-----UV-----
(20151003,111)
(20151005,212)
(20151006,97)
(20151004,247)
Spark SQL
配置
所有配置与[Spark Scala](#Spark Scala)一致,只需修改Spark-submit为
--class com.baidubce.bmr.sample.AccessLogStatsSQLSample
查看结果
请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:
如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:
------PV------
+--------+---+
| date| pv|
+--------+---+
|20151003|139|
|20151004|375|
|20151005|372|
|20151006|114|
+--------+---+
------UV------
+--------+---+
| date| uv|
+--------+---+
|20151003|111|
|20151004|247|
|20151005|212|
|20151006| 97|
+--------+---+
Spark Native Engine
截止目前,Spark是一个运行在JVM上的大数据计算引擎,其计算性能依然有很大的提升空间。我们通过引入ClickHouse计算引擎实现了Spark性能的巨大提升,同时还能保证和现有Spark的兼容性。
基本原理
我们先以一段简单的聚合SQL为例,看下Spark是如何执行的
我们的SQL代码经过解析先在Driver中生成了执行计划,执行计划优化后最终转化为一个个的RDD,每个RDD和其要处理的分片信息最终会发往对应的Executor,以Task为执行单元来执行对应的计算逻辑。这里可以将一个Task当做Executor中的一个线程,这个线程里完成了主要的计算逻辑。在Task中实际执行的Scan、Filter、Agg等计算逻辑就是由对应的RDD生成的。
实际上ClickHouse中也有对应于Scan、Filter、Agg这样的计算逻辑,所以我们可以将每个Task中的实际计算逻辑交给ClickHouse来执行,凭借ClickHouse强大的计算性能实现Spark性能的提升。我们这里称使用ClickHouse加速以后的Spark引擎为SNE(Spark Native Engine)。
下面是这段SQL使用SNE执行时的执行计划以及每个Task中Java和ClickHouse的调用关系。
可以看到我们使用JNI将执行计划信息发给了ClickHouse,在整体分布式框架上,依然保持了Spark现有的架构。
使用方法
使用SNE的方法非常简单,只需要增加如下配置即可:
spark.sql.execution.clickhouse true
由于我们目前还没有支持所有Spark特性,对于不支持的情况,我们会自动回退到原生Spark来执行用户的请求。另外我们支持了如下配置可以强制使用SNE执行而不回滚
spark.sql.clickhouse.fallback false
这个配置的默认值是true
,就是会自动回滚。
进展
在数据类型方面,我们目前已经支持了常见的基础类型:BooleanType, ByteType, IntegerType, LongType, FloatType, DoubleType, DecimalType, TimestampType, DateType, StringType。
在算子级别的覆盖度见下表:
Operator | 支持度 |
---|---|
FileSourceScanExec | 支持 |
FilterExec | 支持 |
ProjectExec | 支持 |
HashAggregateExec | 支持 |
SortExec | 支持 |
BroadcastHashJoinExec | 支持 |
ShuffledHashJoinExec | 支持 |
SortMergeJoinExec | 支持 |
CartesianProductExec | 支持 |
BroadcastNestedLoopJoinExec | 支持 |
WindowExec | 支持 |
GlobalLimitExec | 支持 |
LocalLimitExec | 支持 |
CollectLimitExec | 支持 |
TakeOrderedAndProjectExec | 支持 |
ExpandExec | 支持 |
UnionExec | 支持 |
ShuffleExchangeExec | 支持 |
BroadcastExchangeExec | 支持 |
SubqueryBroadcastExec | 支持 |
DataWritingCommandExec | 不支持 |
BatchScanExec | 不支持 |
ObjectHashAggregateExec | 不支持 |
SortAggregateExec | 不支持 |
CoalesceExec | 不支持 |
GenerateExec | 不支持 |
RangeExec | 不支持 |
SampleExec | 不支持 |
CustomShuffleReaderExec | 不支持 |
InMemoryTableScanExec | 不支持 |
AggregateInPandasExec | 不支持 |
ArrowEvalPythonExec | 不支持 |
FlatMapGroupsInPandasExec | 不支持 |
MapInPandasExec | 不支持 |
WindowInPandasExec | 不支持 |
Velox2Row | 不支持 |
Velox2Arrow | 不支持 |
这里我们虽然只支持了一半多一点的算子,但是在SQL场景,基本上都已经都能覆盖到了。在SQL场景,还有两类SQL需要特别说明一下:
- 我们虽然不支持DataWritingCommandExec算子,但是对于包括了该算子的SQL,比如
insert into table_x select ...
,我们会将除了DataWritingCommandExec以外的算子全部执行在ClickHouse上,最后调用原生Spark的DataWritingCommandExec来完成计算。所以除了DataWritingCommandExec以外的查询算子依然可以享受到ClickHouse的性能提速。 - 还有Scan相关的算子,比如BatchScanExec、InMemoryTableScanExec,如果SQL用到了这些算子,目前会将整个SQL回退到原生Spark执行。这些算子我们接下来会尽快支持。
相对于原生Spark的内建函数,我们目前先支持了其中较常用的部分,剩余函数会逐步支持。目前支持的函数如下:
函数 | 说明 |
---|---|
cast | |
coalesce | |
isNull | |
isNotNull | |
abs | |
year | |
month | |
day/dayofmonth | |
dayofweek | |
weekofyear | |
hour | |
minute | |
second | |
quarter | |
dayofyear | |
upper/ucase | |
!/not | |
~ | |
cbrt | |
cos | |
floor | |
ceil/ceiling | |
sqrt | |
length/char_length/character_length | 暂不支持BinaryType |
log2 | |
log10 | |
+ | |
- | |
* | |
/ | |
%/mod | 暂不支持DecimalType |
and | |
or | |
=/== | |
< | |
> | |
<= | |
>= | |
datediff | |
date_add | |
date_sub | |
add_months | |
date_format | |
like | |
round | |
shiftright | |
shiftleft | |
& | |
| | |
^ | |
pow/power | |
instr | |
substr/substring | |
regexp_extract | |
locate/position | |
regexp_replace | |
concat | |
when | |
in | |
parse_url | 不支持3个参数的版本,另外第二个参数只支持常量 |
在性能方面,在5T规模的TPC-DS数据集下,相对于社区版Spark的性能对比见下图
其中原生Spark总的执行时间是15257秒,SNE总的执行时间为7779秒。