Spark
Spark Introduction
The Weblog analysis for statistics on daily PV and UV is taken as an example to introduce how to use Spark on the Baidu AI Cloud platform in this document.
Spark is an open-source massive data processing engine. The advanced DAG execution engine of Spark supports periodic computing of data stream and memory, and it runs over 100 times and ten times faster than MapReduce in the memory and hard disk, respectively. Spark provides high-level API for Java, Scala, Python, and R language, and it seamlessly integrates many tools: Spark SQL (SQL), MLlib (machine learning), GraphX (graphic processing), and Spark Streaming (streaming processing). Spark can access data stored on HDFS, HBase, Cassandra, and local file system, and support text file, sequence file, and any input file of Hadoop.
Spark provides end-to-end services:
- Spark’s Driver includes your step program, and parses and generates the step program;
- Driver applies for step running resources with the Master node;
- Master node assigns Core node to the step, and creates Executor at the Core node;
- Driver transmits code and file of Spark step to the assigned Executor;
- Executor runs step, and returns outcome to Driver or write in the specified output location.
Cluster Preparation
- Prepare the data. For more information, please see Data Preparation.
- Preparation for Baidu AI Cloud Environment.
-
Log in to the console, select "Product Service->Baidu MapReduce BMR", and click "Create Cluster" to enter the cluster creation page and configure the following:
- Set cluster name
- Set administrator password
- Disable log
- Select image version “BMR 1.0.0(hadoop 2.7)”
- Select built-in template “spark”
- Keep other default configurations of the cluster, and click "Finish" to view the created cluster in the cluster list page. The cluster is created successfully when cluster status changes from "Initializing" to "Waiting".
Spark Java
Program Preparation
The code of the Spark sample program provided by Baidu AI Cloud is uploaded to https://github.com/BCEBIGDATA/bmr-sample-java. You can clone code through GitHub to design local program and submit to Baidu Object Storage (BOS).
/**
* Analyze log with Spark.
*/
public class AccessLogAnalyzer {
private static final SimpleDateFormat logDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US);
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
private static String fetchDate(String gmsTime) {
String date;
try {
date = simpleDateFormat.format(logDateFormat.parse(gmsTime));
} catch (ParseException e) {
date = null;
}
return date;
}
private static final Pattern LOGPATTERN = Pattern.compile(
"(\\S+)\\s+-\\s+\\[(.*?)\\]\\s+\"(.*?)\"\\s+(\\d{3})\\s+(\\S+)\\s+"
+ "\"(.*?)\"\\s+\"(.*?)\"\\s+(.*?)\\s+\"(.*?)\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)");
private static Tuple2<String, String> extractKey(String line) {
Matcher m = LOGPATTERN.matcher(line);
if (m.find()) {
String ipAddr = m.group(1);
String date = fetchDate(m.group(2));
return new Tuple2<>(date, ipAddr);
}
return new Tuple2<>(null, null);
}
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.AccessLogAnalyzer <input> <pv> <uv>");
System.exit(1);
}
SparkConf conf = new SparkConf().setAppName("AccessLogAnalyzer");
JavaSparkContext sc = new JavaSparkContext(conf);
// Parses the log to log records and caches the result.
JavaPairRDD<String, String> distFile = sc.textFile(args[0]).mapToPair(
new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
return extractKey(s);
}
});
distFile.cache();
// Changes the log info to (date, 1) format, and calculates the page view.
JavaPairRDD<String, Integer> pv = distFile.mapToPair(
new PairFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, String> tuple) {
return new Tuple2<>(tuple._1(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Coalesces to 1 partition and saves as file. Notice that this is for demo purpose only.
pv.coalesce(1, true).saveAsTextFile(args[1]);
// Changes the log info to (date, remoteAddr) and calculates the unique visitors.
JavaPairRDD<String, Integer> uv = distFile.groupByKey().mapToPair(
new PairFunction<Tuple2<String, Iterable<String>>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> tuple) {
int size = new HashSet((Collection<?>) tuple._2()).size();
return new Tuple2<>(tuple._1(), size);
}
});
// Coalesces to 1 partition and saves as file. Notice that this is for demo purpose only.
uv.coalesce(1, true).saveAsTextFile(args[2]);
}
}
Run Spark Steps
- In "Product Service>MapReduce>Baidu MapReduce-Homework List" page, click "Create Step" to enter the step creation page.
-
Configure Spark step parameters as follows:
- Step type: Select “Spark step”.
- Step name: Enter the step name with length not exceeding 255 characters.
- Application location: If using self-complied program, you need to upload program jar package to BOS or your local HDFS, and enter program path there; you can directly use sample program provided by Baidu AI Cloud, and the path is as follows: - Sample program path for clusters in North China - Beijing region:
bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar
. - Sample program path for clusters in South China - Guangzhou region:bos://bmr-public-gz/sample/spark-1.0-SNAPSHOT.jar
. - Action after failure: Continue.
- Spark-submit:
--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer
- Application parameters: Specify the input data path and output path (BOS or HDFS), and the output path must have write permission and not be repeated. For example, if the sample log is used as input data and BOS is used as an output path, the input is as follows: - Parameters for BMR clusters in North China - Beijing region:
bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
. - Parameters for BMR clusters in South China - Guangzhou region:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
.
- Select the adaptive cluster in the "Cluster Adaption" section.
- Click "Finish" to complete the creation of the step. The status changes from "Waiting" to "Running" when the step is running, and then it changes to "Completed" when the step is completed. After then, you can view the query result.
Check Results
View the output at the storage system selected by you (BOS or HDFS). You can view the output in BOS as follows:
If you use input data and program provided by the system, you can use the path bos://{your-bucket}/output/pv and bos://{your-bucket}/output/uv to view the output as follows:
------PV------
20151003 139
20151005 372
20151006 114
20151004 375
------UV------
20151003 111
20151005 212
20151006 97
20151004 247
Spark Scala
Program Preparation
You can directly use Sample Program. You can design your program, use the command line cd
to access the root directory of program code, execute mvn package
to generate jar file, and upload to BOS.
Run Spark Steps
- In "Product Service>MapReduce>Baidu MapReduce-Homework List" page, click "Create Step" to enter the step creation page.
-
Configure Spark step parameters as follows:
- Step type: Select “Spark step”.
- Step name: Enter the step name with length not exceeding 255 characters.
- Application location: If using self-complied program, you need to upload program jar package to BOS or your local HDFS, and enter program path there; you can directly use sample program provided by Baidu AI Cloud, and the sample program is only available for clusters in North China - Beijing region. The path is
bos://bmr-public-data/apps/spark/bmr-spark-scala-samples-1.0-SNAPSHOT-jar-with-dependencies.jar
. - Action after failure: Continue.
- Spark-submit:
--class com.baidubce.bmr.sample.AccessLogStatsScalaSample
- Application parameters: Specify the input data path and output path (BOS or HDFS), and the output path must have write permission and not be repeated. For example, if the sample log is used as input data and BOS is used as an output path, the input is as follows: - Parameters for BMR clusters in North China - Beijing region:
bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
. - Parameters for BMR clusters in South China - Guangzhou region:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv
.
- Select the adaptive cluster in the "Cluster Adaption" section.
- Click "Finish" to complete the creation of the step. The status changes from "Waiting" to "Running" when the step is running, and changes to "Completed" when the step is completed. After then, you can view the query result.
View Results
View the output at the storage system selected by you (BOS or HDFS). You can view the output in BOS as follows:
If you use input data and program provided by the system, you can use the path bos://{your-bucket}/output/pv and bos://{your-bucket}/output/uv to view the output as follows:
-----PV-----
(20151003,139)
(20151005,372)
(20151006,114)
(20151004,375)
-----UV-----
(20151003,111)
(20151005,212)
(20151006,97)
(20151004,247)
Spark SQL
Configuration
All configurations are same as [Spark Scala](#Spark Scala), and you only need to modify Spark-submit to be --class com.baidubce.bmr.sample.AccessLogStatsSQLSample
.
View Results
View the output at the storage system selected by you (BOS or HDFS). You can view the output in BOS as follows:
If you use input data and program provided by the system, you can use the path bos://{your-bucket}/output/pv and bos://{your-bucket}/output/uv to view the output as follows:
------PV------
+--------+---+
| date| pv||
+--------+---+
|20151003|139||
|20151004|375||
|20151005|372||
|20151006|114||
+--------+---+
------UV------
+--------+---+
| date| uv||
+--------+---+
|20151003|111||
|20151004|247||
|20151005|212||
|20151006| 97||
+--------+---+