基础使用
Spark SQL 基础操作
Spark SQL允许用户直接运用SQL语句对数据进行操作,在此过程中,Spark会负责对SQL语句进行解析、优化以及执行。
以下示例展示了如何使用Spark SQL进行读取文件。示例如下:
- 示例1:Spark支持多种数据格式,本示例读取了JSON格式文件的数据,并输出为Parquet格式。
val peopleDF = spark.read.json("examples/student.json")
peopleDF.write.parquet("student.parquet")
- 示例2:借助 SQL 从 parquetFile 表中提取出年龄处于 13 岁至 19 岁区间的年轻人的名字,接着将这些数据转换为 DataFrame。之后,利用 Map 操作把名字处理成可读的形式,最后输出处理后的结果。
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
启动Spark Shell
Spark的Shell是一款功能强大的交互式数据分析工具,它为学习API提供了一种简便途径。在Spark中,你既能够运用Scala语言,也能够使用Python语言。
操作步骤:
- 通过SSH方式连接集群,详情请参见登录集群。
- 执行以下命令,启动Spark Shell。
spark-shell
RDD基础操作
Spark是以弹性分布式数据集(RDD)这一概念为核心构建的,RDD是能够进行并行操作且具备容错能力的元素集合。在Spark里,创建RDD有两种途径,一是通过集合来创建,二是借助外部数据集进行构建。像共享文件系统、HDFS、HBase或者任何提供Hadoop InputFormat的数据集,都可用于构建RDD。
- 创建RDD示例:
- 通过集合来创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
- 通过外部数据集构建RDD
val distFile = sc.textFile("data.txt")
当 RDD 成功构建完成后,可以针对它开展一系列操作,诸如 Map 和 Reduce 等操作。
比如,执行以下代码时,会先从外部存储系统读取一个文本文件,利用该文件构建出一个RDD。接着,借助RDD的Map算子对其进行运算,从而得到文本文件中每一行的长度。最后,再通过Reduce算子进行计算,得出文本文件中各行长度的总和。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
一般来说,Spark RDD有两种常见操作,分别是Transform操作和Action操作。Transform操作不会马上执行,而是要等到执行Action操作时才会真正执行。
- Transform操作
操作 | 描述 |
---|---|
map() |
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。 |
flatMap() |
参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。 |
filter() |
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。 |
distinct() |
没有参数,将RDD里的元素进行去重操作。 |
union() |
参数是RDD,生成包含两个RDD所有元素的新RDD。 |
intersection() |
参数是RDD,求出两个RDD的共同元素。 |
subtract() |
参数是RDD,去掉原RDD里和参数RDD里相同的元素。 |
cartesian() |
参数是RDD,求两个RDD的笛卡尔积。 |
- Action操作
操作 | 描述 |
---|---|
collect() |
返回RDD所有元素。 |
count() |
返回RDD中的元素个数。 |
countByValue() |
返回各元素在RDD中出现的次数。 |
reduce() |
并行整合所有RDD数据,例如求和操作。 |
fold(0)(func) |
和reduce() 功能一样,但是fold带有初始值。 |
aggregate(0)(seqOp,combop) |
和reduce() 功能一样,但是返回的RDD数据类型和原RDD不一样。 |
foreach(func) |
对RDD每个元素都是使用特定函数。 |
PySpark基础操作
- SSH登录集群,参考SSH连接到集群
- 执行以下命令,进入PySpark交互式环境:
pyspark
- 初始化SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
- 创建DataFrame
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
],schema='a long, b double, c string, d date, e timestamp')
DataFrame创建完成后,可以通过各种类型的transform算子完成数据计算。
SparkSQL UDF 基础操作
Spark SQL具备众多内建函数,能够满足您的计算需求。同时,您还可以通过创建自定义函数(UDF)来满足各种不同的计算需求。UDF在使用方式上和普通的内建函数相近。本文将为您介绍在Spark SQL中使用Hive自定义函数的具体流程。
前提条件
已在Hive中创建了UDF,详情请参见开发UDF。
使用Hive UDF
- 使用文件传输工具,上传生成的JAR包至集群任意目录(本文以test目录为例)。
- 上传JAR包至HDFS或BOS(本文以HDFS为例)。
a.通过SSH方式登录集群,详情请参见登录集群。
b.执行以下命令,上传JAR包到HDFS:
hadoop fs -put /test/hiveudf-1.0-SNAPSHOT.jar /user/hive/warehouse/
您可以通过hadoop fs -ls /user/hive/warehouse/
命令,查看是否上传成功。待返回信息如下所示表示上传成功。
Found 1 items
-rw-r--r-- 1 xx xx 2668 2021-06-09 14:13 /user/hive/warehouse/hiveudf-1.0-SNAPSHOT.jar
- 执行以下命令,使用UDF函数。
该函数与内置函数使用方式一样,直接使用函数名称即可访问。
select myfunc("abc");
返回如下信息。
OK
abc:HelloWorld