Spark

Spark简介

本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用Spark。

Spark是开源的大规模数据处理引擎。Spark的先进的DAG执行引擎支持周期性数据流和内存计算,在内存中的运算速度是MapReduce的100倍以上,在硬盘中的运算速度是MapReduce的10倍以上。Spark提供了Java、Scala、Python和R语言的高水平API,同时Spark已无缝融合了丰富的工具:Spark SQL(SQL)、MLlib(机器学习)、GraphX(图形处理)、Spark Streaming(流式处理)。Spark可访问存储在HDFS、HBase、Cassandra、本地文件系统等上的数据,支持文本文件、序列文件、以及任何Hadoop的输入文件。

Spark提供端到端的服务:

  1. Spark的Driver包含您的作业程序,完成作业程序的解析和生成;
  2. Driver向集群的Master节点申请运行作业所需的资源;
  3. Master节点为作业分配满足要求的Core节点,并在该节点按要求创建Executor;
  4. Driver将Spark作业的代码和文件传送给分配的Executor;
  5. Executor运行作业,将结果返回给Driver或写入指定的输出位置。

集群准备

  1. 准备数据,请参考数据准备
  2. 百度智能云环境准备
  3. 登录控制台,选择“产品服务->百度MapReduce BMR”,点击“创建集群”,进入集群创建页,并做如下配置:
    • 设置集群名称
    • 设置管理员密码
    • 关闭日志开关
    • 选择镜像版本“BMR 1.0.0(hadoop 2.7)”
    • 选择内置模板“spark”
  4. 请保持集群的其他默认配置不变,点击“完成”可在集群列表页可查看已创建的集群,当集群状态由“初始化中”变为“空闲中”时,集群创建成功。

Spark Java

程序准备

百度智能云提供的Spark样例程序的代码已上传至:https://github.com/BCEBIGDATA/bmr-sample-java,您可通过GitHub克隆代码至本地设计自己的程序,并上传到对象存储BOS(具体操作详见对象存储BOS入门指南)。

/**
 * Analyze log with Spark.
 */
public class AccessLogAnalyzer {

    private static final SimpleDateFormat logDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US);
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");

    private static String fetchDate(String gmsTime) {
        String date;
        try {
            date = simpleDateFormat.format(logDateFormat.parse(gmsTime));
        } catch (ParseException e) {
            date = null;
        }
        return date;
    }

    private static final Pattern LOGPATTERN = Pattern.compile(
            "(\\S+)\\s+-\\s+\\[(.*?)\\]\\s+\"(.*?)\"\\s+(\\d{3})\\s+(\\S+)\\s+"
            + "\"(.*?)\"\\s+\"(.*?)\"\\s+(.*?)\\s+\"(.*?)\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)");

    private static Tuple2<String, String> extractKey(String line) {
        Matcher m = LOGPATTERN.matcher(line);
        if (m.find()) {
            String ipAddr = m.group(1);
            String date = fetchDate(m.group(2));
            return new Tuple2<>(date, ipAddr);
        }
        return new Tuple2<>(null, null);
    }

    public static void main(String[] args) {
        if (args.length != 3) {
            System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.AccessLogAnalyzer <input> <pv> <uv>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf().setAppName("AccessLogAnalyzer");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Parses the log to log records and caches the result.
        JavaPairRDD<String, String> distFile = sc.textFile(args[0]).mapToPair(
                new PairFunction<String, String, String>() {
                    @Override
                    public Tuple2<String, String> call(String s) {
                        return extractKey(s);
                    }
                });
        distFile.cache();

        // Changes the log info to (date, 1) format, and caculates the page view.
        JavaPairRDD<String, Integer> pv = distFile.mapToPair(
                new PairFunction<Tuple2<String, String>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, String> tuple) {
                        return new Tuple2<>(tuple._1(), 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override