从Spark导入
更新时间:2024-11-25
将Apache Spark与ClickHouse集成
连接Apache Spark和ClickHouse有两种主要方式:
- Spark连接器-Spark连接器实现了DataSourceV2,并具有自己的目录管理。截至今天,这是集成ClickHouse和Spark的推荐方式。
- Spark JDBC-使用JDBC数据源集成Spark和ClickHouse。
Spark连接器
此连接器利用ClickHouse特定的优化,如高级分区和谓词下推,来提高查询性能和数据处理。该连接器基于ClickHouse的官方JDBC连接器,并管理自己的目录。
必要条件
- Java 8 or 17
- Scala 2.12 or 2.13
- Apache Spark 3.3 or 3.4 or 3.5
兼容性
版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 |
---|---|---|
main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
0.6.0 | Spark 3.3 | 0.3.2-patch11 |
0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
0.4.0 | Spark 3.2, 3.3 | Not depend on |
0.3.0 | Spark 3.2, 3.3 | Not depend on |
0.2.1 | Spark 3.2 | Not depend on |
0.1.2 | Spark 3.2 | Not depend on |
下载库
二进制JAR的名称模式是:
clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
您可以在Maven中央存储库中找到所有可用的已发布JAR,在Sonatype OSS快照存储库中可以找到所有每日构建的SNAPSHOT JAR。
作为依赖项导入
- Gradle
dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}
如果要使用SNAPSHOT版本,请添加以下存储库:
repositries {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}
- Maven
<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
如果要使用SNAPSHOT版本,请添加以下存储库。
<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
使用Spark SQL
注意:对于仅使用SQL的用例,建议将Apache Kyuubi用于生产环境。
启动Spark SQL命令行界面
$SPARK_HOME/bin/spark-sql \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下论点:
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
--repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
操作
基本操作,例如创建数据库、创建表、写表、读表等。
spark-sql> use clickhouse;
Time taken: 0.016 seconds
spark-sql> create database if not exists test_db;
Time taken: 0.022 seconds
spark-sql> show databases;
default
system
test_db
Time taken: 0.289 seconds, Fetched 3 row(s)
spark-sql> CREATE TABLE test_db.tbl_sql (
> create_time TIMESTAMP NOT NULL,
> m INT NOT NULL COMMENT 'part key',
> id BIGINT NOT NULL COMMENT 'sort key',
> value STRING
> ) USING ClickHouse
> PARTITIONED BY (m)
> TBLPROPERTIES (
> engine = 'MergeTree()',
> order_by = 'id',
> settings.index_granularity = 8192
> );
Time taken: 0.242 seconds
spark-sql> insert into test_db.tbl_sql values
> (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
> (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
> as tabl(create_time, m, id, value);
Time taken: 0.276 seconds
spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
Time taken: 0.116 seconds, Fetched 2 row(s)
spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 1.028 seconds
spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 0.462 seconds
spark-sql> select count(*) from test_db.tbl_sql;
6
Time taken: 1.421 seconds, Fetched 1 row(s)
spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.123 seconds, Fetched 6 row(s)
spark-sql> delete from test_db.tbl_sql where id = 1;
Time taken: 0.129 seconds
spark-sql> select * from test_db.tbl_sql;
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.101 seconds, Fetched 3 row(s)
使用 Spark Shell
启动 Spark Shell
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下论点:
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
--repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
Shell基本操作
基本操作,例如创建数据库、创建表、写表、读表等。
scala> spark.sql("use clickhouse")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("create database test_db")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases").show
+---------+
|namespace|
+---------+
| default|
| system|
| test_db|
+---------+
scala> spark.sql("""
| CREATE TABLE test_db.tbl (
| create_time TIMESTAMP NOT NULL,
| m INT NOT NULL COMMENT 'part key',
| id BIGINT NOT NULL COMMENT 'sort key',
| value STRING
| ) USING ClickHouse
| PARTITIONED BY (m)
| TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
| )
| """)
res2: org.apache.spark.sql.DataFrame = []
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"m", $"id", $"value")
.writeTo("test_db.tbl")
.append
// Exiting paste mode, now interpreting.
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2021-01-01 10:10:10| 1| 1| 1|
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
res3: org.apache.spark.sql.DataFrame = []
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
执行ClickHouse原生SQL。
scala> val options = Map(
| "host" -> "clickhouse",
| "protocol" -> "http",
| "http_port" -> "8123",
| "user" -> "default",
| "password" -> ""
| )
scala> val sql = """
| |CREATE TABLE test_db.person (
| | id Int64,
| | name String,
| | age Nullable(Int32)
| |)
| |ENGINE = MergeTree()
| |ORDER BY id
| """.stripMargin
scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options)
scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test_db| person| false|
+---------+---------+-----------+
scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
root
|-- id: long (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = true)
支持的数据类型
本节概述了Spark和ClickHouse之间的数据类型映射。下表提供了从ClickHouse读取数据到Spark以及将Spark数据插入ClickHouse时转换数据类型的快速参考。
将数据从ClickHouse读取到Spark
ClickHouse 数据类型 | Spark 数据类型 | 是否支持 | 是原始的 | 备注 |
---|---|---|---|---|
Nothing | NullType | ✅ | 是 | |
Bool | BooleanType | ✅ | 是 | |
UInt8, Int16 | ShortType | ✅ | 是 | |
Int8 | ByteType | ✅ | 是 | |
UInt16,Int32 | IntegerType | ✅ | 是 | |
UInt32,Int64, UInt64 | LongType | ✅ | 是 | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | 是 | |
Float32 | FloatType | ✅ | 是 | |
Float64 | DoubleType | ✅ | 是 | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | 是 | |
FixedString | BinaryType, StringType | ✅ | 是 | 由配置控制READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | 是 | 精度和规模高达Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | 是 | |
Decimal64 | DecimalType(18, scale) | ✅ | 是 | |
Decimal128 | DecimalType(38, scale) | ✅ | 是 | |
Date, Date32 | DateType | ✅ | 是 | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | 是 | |
Array | ArrayType | ✅ | 是 | 数组元素类型也会被转换 |
Map | MapType | ✅ | 是 | 钥匙仅限于StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | 是 | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | 是 | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | 否 | 使用特定间隔类型 |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
将Spark中的数据插入ClickHouse
Spark 数据类型 | ClickHouse 数据类型 | 是否支持 | Is Primitive | 备注 |
---|---|---|---|---|
BooleanType | UInt8 | ✅ | Yes | |
ByteType | Int8 | ✅ | Yes | |
ShortType | Int16 | ✅ | Yes | |
IntegerType | Int32 | ✅ | Yes | |
LongType | Int64 | ✅ | Yes | |
FloatType | Float32 | ✅ | Yes | |
DoubleType | Float64 | ✅ | Yes | |
StringType | String | ✅ | Yes | |
VarcharType | String | ✅ | Yes | |
CharType | String | ✅ | Yes | |
DecimalType | Decimal(p, s) | ✅ | Yes | Precision and scale up to Decimal128 |
DateType | Date | ✅ | Yes | |
TimestampTypeArrayType (list, tuple, or array) | DateTime | ✅ | Yes | |
ArrayType (list, tuple, or array) | Array | ✅ | No | Array element type is also converted |
MapType | Map | ✅ | No | Keys are limited toStringType |
Object | ❌ | |||
Nested | ❌ |
Spark JDBC
读取数据
public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC connection details
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// Load the table from ClickHouse
Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);
// Show the DataFrame
df.show();
// Stop the Spark session
spark.stop();
}
写入数据
public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC connection details
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "******");
// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "my_table", jdbcProperties);
// Show the DataFrame
df.show();
// Stop the Spark session
spark.stop();
}