从Spark导入
更新时间:2025-01-20
将Apache Spark与ClickHouse集成
连接Apache Spark和ClickHouse有两种主要方式:
- Spark连接器-Spark连接器实现了DataSourceV2,并具有自己的目录管理。截至今天,这是集成ClickHouse和Spark的推荐方式。
- Spark JDBC-使用JDBC数据源集成Spark和ClickHouse。
Spark连接器
此连接器利用ClickHouse特定的优化,如高级分区和谓词下推,来提高查询性能和数据处理。该连接器基于ClickHouse的官方JDBC连接器,并管理自己的目录。
必要条件
- Java 8 or 17
- Scala 2.12 or 2.13
- Apache Spark 3.3 or 3.4 or 3.5
兼容性
| 版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 | 
|---|---|---|
| main | Spark 3.3, 3.4, 3.5 | 0.6.3 | 
| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 | 
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 | 
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 | 
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 | 
| 0.4.0 | Spark 3.2, 3.3 | Not depend on | 
| 0.3.0 | Spark 3.2, 3.3 | Not depend on | 
| 0.2.1 | Spark 3.2 | Not depend on | 
| 0.1.2 | Spark 3.2 | Not depend on | 
下载库
二进制JAR的名称模式是:
                Plain Text
                
            
            1clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar您可以在Maven中央存储库中找到所有可用的已发布JAR,在Sonatype OSS快照存储库中可以找到所有每日构建的SNAPSHOT JAR。
作为依赖项导入
- Gradle
                Plain Text
                
            
            1dependencies {
2  implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
3  implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
4}如果要使用SNAPSHOT版本,请添加以下存储库:
                Plain Text
                
            
            1repositries {
2  maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
3}- Maven
                Plain Text
                
            
            1<dependency>
2  <groupId>com.clickhouse.spark</groupId>
3  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
4  <version>{{ stable_version }}</version>
5</dependency>
6<dependency>
7  <groupId>com.clickhouse</groupId>
8  <artifactId>clickhouse-jdbc</artifactId>
9  <classifier>all</classifier>
10  <version>{{ clickhouse_jdbc_version }}</version>
11  <exclusions>
12    <exclusion>
13      <groupId>*</groupId>
14      <artifactId>*</artifactId>
15    </exclusion>
16  </exclusions>
17</dependency>如果要使用SNAPSHOT版本,请添加以下存储库。
                Plain Text
                
            
            1<repositories>
2  <repository>
3    <id>sonatype-oss-snapshots</id>
4    <name>Sonatype OSS Snapshots Repository</name>
5    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
6  </repository>
7</repositories>使用Spark SQL
注意:对于仅使用SQL的用例,建议将Apache Kyuubi用于生产环境。
启动Spark SQL命令行界面
                Plain Text
                
            
            1$SPARK_HOME/bin/spark-sql \
2  --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4  --conf spark.sql.catalog.clickhouse.protocol=http \
5  --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6  --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8  --conf spark.sql.catalog.clickhouse.database=default \
9  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar以下论点:
                Plain Text
                
            
            1  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
                Plain Text
                
            
            1  --repositories https://{maven-cental-mirror or private-nexus-repo} \
2  --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all操作
基本操作,例如创建数据库、创建表、写表、读表等。
                Plain Text
                
            
            1spark-sql> use clickhouse;
2Time taken: 0.016 seconds
3
4spark-sql> create database if not exists test_db;
5Time taken: 0.022 seconds
6
7spark-sql> show databases;
8default
9system
10test_db
11Time taken: 0.289 seconds, Fetched 3 row(s)
12
13spark-sql> CREATE TABLE test_db.tbl_sql (
14         >   create_time TIMESTAMP NOT NULL,
15         >   m           INT       NOT NULL COMMENT 'part key',
16         >   id          BIGINT    NOT NULL COMMENT 'sort key',
17         >   value       STRING
18         > ) USING ClickHouse
19         > PARTITIONED BY (m)
20         > TBLPROPERTIES (
21         >   engine = 'MergeTree()',
22         >   order_by = 'id',
23         >   settings.index_granularity = 8192
24         > );
25Time taken: 0.242 seconds
26
27spark-sql> insert into test_db.tbl_sql values
28         > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
29         > (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
30         > as tabl(create_time, m, id, value);
31Time taken: 0.276 seconds
32
33spark-sql> select * from test_db.tbl_sql;
342021-01-01 10:10:10 1   1   1
352022-02-02 10:10:10 2   2   2
36Time taken: 0.116 seconds, Fetched 2 row(s)
37
38spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
39Time taken: 1.028 seconds
40
41spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
42Time taken: 0.462 seconds
43
44spark-sql> select count(*) from test_db.tbl_sql;
456
46Time taken: 1.421 seconds, Fetched 1 row(s)
47
48spark-sql> select * from test_db.tbl_sql;
492021-01-01 10:10:10 1   1   1
502021-01-01 10:10:10 1   1   1
512021-01-01 10:10:10 1   1   1
522022-02-02 10:10:10 2   2   2
532022-02-02 10:10:10 2   2   2
542022-02-02 10:10:10 2   2   2
55Time taken: 0.123 seconds, Fetched 6 row(s)
56
57spark-sql> delete from test_db.tbl_sql where id = 1;
58Time taken: 0.129 seconds
59
60spark-sql> select * from test_db.tbl_sql;
612022-02-02 10:10:10 2   2   2
622022-02-02 10:10:10 2   2   2
632022-02-02 10:10:10 2   2   2
64Time taken: 0.101 seconds, Fetched 3 row(s)使用 Spark Shell
启动 Spark Shell
                Plain Text
                
            
            1$SPARK_HOME/bin/spark-shell \
2  --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4  --conf spark.sql.catalog.clickhouse.protocol=http \
5  --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6  --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8  --conf spark.sql.catalog.clickhouse.database=default \
9  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar以下论点:
                Plain Text
                
            
            1  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
                Plain Text
                
            
            1  --repositories https://{maven-cental-mirror or private-nexus-repo} \
2  --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:allShell基本操作
基本操作,例如创建数据库、创建表、写表、读表等。
                Plain Text
                
            
            1scala> spark.sql("use clickhouse")
2res0: org.apache.spark.sql.DataFrame = []
3
4scala> spark.sql("create database test_db")
5res1: org.apache.spark.sql.DataFrame = []
6
7scala> spark.sql("show databases").show
8+---------+
9|namespace|
10+---------+
11|  default|
12|   system|
13|  test_db|
14+---------+
15
16scala> spark.sql("""
17     | CREATE TABLE test_db.tbl (
18     |   create_time TIMESTAMP NOT NULL,
19     |   m           INT       NOT NULL COMMENT 'part key',
20     |   id          BIGINT    NOT NULL COMMENT 'sort key',
21     |   value       STRING
22     | ) USING ClickHouse
23     | PARTITIONED BY (m)
24     | TBLPROPERTIES (
25     |   engine = 'MergeTree()',
26     |   order_by = 'id',
27     |   settings.index_granularity = 8192
28     | )
29     | """)
30res2: org.apache.spark.sql.DataFrame = []
31
32scala> :paste
33// Entering paste mode (ctrl-D to finish)
34
35spark.createDataFrame(Seq(
36    ("2021-01-01 10:10:10", 1L, "1"),
37    ("2022-02-02 10:10:10", 2L, "2")
38)).toDF("create_time", "id", "value")
39    .withColumn("create_time", to_timestamp($"create_time"))
40    .withColumn("m", month($"create_time"))
41    .select($"create_time", $"m", $"id", $"value")
42    .writeTo("test_db.tbl")
43    .append
44
45// Exiting paste mode, now interpreting.
46
47scala> spark.table("test_db.tbl").show
48+-------------------+---+---+-----+
49|        create_time|  m| id|value|
50+-------------------+---+---+-----+
51|2021-01-01 10:10:10|  1|  1|    1|
52|2022-02-02 10:10:10|  2|  2|    2|
53+-------------------+---+---+-----+
54
55scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
56res3: org.apache.spark.sql.DataFrame = []
57
58scala> spark.table("test_db.tbl").show
59+-------------------+---+---+-----+
60|        create_time|  m| id|value|
61+-------------------+---+---+-----+
62|2022-02-02 10:10:10|  2|  2|    2|
63+-------------------+---+---+-----+执行ClickHouse原生SQL。
                Plain Text
                
            
            1scala> val options = Map(
2     |     "host" -> "clickhouse",
3     |     "protocol" -> "http",
4     |     "http_port" -> "8123",
5     |     "user" -> "default",
6     |     "password" -> ""
7     | )
8
9scala> val sql = """
10     | |CREATE TABLE test_db.person (
11     | |  id    Int64,
12     | |  name  String,
13     | |  age Nullable(Int32)
14     | |)
15     | |ENGINE = MergeTree()
16     | |ORDER BY id
17     | """.stripMargin
18
19scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options) 
20
21scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
22+---------+---------+-----------+
23|namespace|tableName|isTemporary|
24+---------+---------+-----------+
25|  test_db|   person|      false|
26+---------+---------+-----------+
27
28scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
29root
30 |-- id: long (nullable = false)
31 |-- name: string (nullable = false)
32 |-- age: integer (nullable = true)支持的数据类型
本节概述了Spark和ClickHouse之间的数据类型映射。下表提供了从ClickHouse读取数据到Spark以及将Spark数据插入ClickHouse时转换数据类型的快速参考。
将数据从ClickHouse读取到Spark
| ClickHouse 数据类型 | Spark 数据类型 | 是否支持 | 是原始的 | 备注 | 
|---|---|---|---|---|
| Nothing | NullType | ✅ | 是 | |
| Bool | BooleanType | ✅ | 是 | |
| UInt8, Int16 | ShortType | ✅ | 是 | |
| Int8 | ByteType | ✅ | 是 | |
| UInt16,Int32 | IntegerType | ✅ | 是 | |
| UInt32,Int64, UInt64 | LongType | ✅ | 是 | |
| Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | 是 | |
| Float32 | FloatType | ✅ | 是 | |
| Float64 | DoubleType | ✅ | 是 | |
| String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | 是 | |
| FixedString | BinaryType, StringType | ✅ | 是 | 由配置控制 READ_FIXED_STRING_AS | 
| Decimal | DecimalType | ✅ | 是 | 精度和规模高达 Decimal128 | 
| Decimal32 | DecimalType(9, scale) | ✅ | 是 | |
| Decimal64 | DecimalType(18, scale) | ✅ | 是 | |
| Decimal128 | DecimalType(38, scale) | ✅ | 是 | |
| Date, Date32 | DateType | ✅ | 是 | |
| DateTime, DateTime32, DateTime64 | TimestampType | ✅ | 是 | |
| Array | ArrayType | ✅ | 是 | 数组元素类型也会被转换 | 
| Map | MapType | ✅ | 是 | 钥匙仅限于 StringType | 
| IntervalYear | YearMonthIntervalType(Year) | ✅ | 是 | |
| IntervalMonth | YearMonthIntervalType(Month) | ✅ | 是 | |
| IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | 否 | 使用特定间隔类型 | 
| Object | ❌ | |||
| Nested | ❌ | |||
| Tuple | ❌ | |||
| Point | ❌ | |||
| Polygon | ❌ | |||
| MultiPolygon | ❌ | |||
| Ring | ❌ | |||
| IntervalQuarter | ❌ | |||
| IntervalWeek | ❌ | |||
| Decimal256 | ❌ | |||
| AggregateFunction | ❌ | |||
| SimpleAggregateFunction | ❌ | 
将Spark中的数据插入ClickHouse
| Spark 数据类型 | ClickHouse 数据类型 | 是否支持 | Is Primitive | 备注 | 
|---|---|---|---|---|
| BooleanType | UInt8 | ✅ | Yes | |
| ByteType | Int8 | ✅ | Yes | |
| ShortType | Int16 | ✅ | Yes | |
| IntegerType | Int32 | ✅ | Yes | |
| LongType | Int64 | ✅ | Yes | |
| FloatType | Float32 | ✅ | Yes | |
| DoubleType | Float64 | ✅ | Yes | |
| StringType | String | ✅ | Yes | |
| VarcharType | String | ✅ | Yes | |
| CharType | String | ✅ | Yes | |
| DecimalType | Decimal(p, s) | ✅ | Yes | Precision and scale up to Decimal128 | 
| DateType | Date | ✅ | Yes | |
| TimestampTypeArrayType (list, tuple, or array) | DateTime | ✅ | Yes | |
| ArrayType (list, tuple, or array) | Array | ✅ | No | Array element type is also converted | 
| MapType | Map | ✅ | No | Keys are limited to StringType | 
| Object | ❌ | |||
| Nested | ❌ | 
Spark JDBC
读取数据
                Plain Text
                
            
            1public static void main(String[] args) {
2        // Initialize Spark session
3        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5        // JDBC connection details
6        String jdbcUrl = "jdbc:ch://localhost:8123/default";
7        Properties jdbcProperties = new Properties();
8        jdbcProperties.put("user", "default");
9        jdbcProperties.put("password", "123456");
10
11        // Load the table from ClickHouse
12        Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);
13
14        // Show the DataFrame
15        df.show();
16
17        // Stop the Spark session
18        spark.stop();
19    }写入数据
                Plain Text
                
            
            1    public static void main(String[] args) {
2        // Initialize Spark session
3        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5        // JDBC connection details
6        String jdbcUrl = "jdbc:ch://localhost:8123/default";
7        Properties jdbcProperties = new Properties();
8        jdbcProperties.put("user", "default");
9        jdbcProperties.put("password", "******");
10        // Create a sample DataFrame
11        StructType schema = new StructType(new StructField[]{
12                DataTypes.createStructField("id", DataTypes.IntegerType, false),
13                DataTypes.createStructField("name", DataTypes.StringType, false)
14        });
15        
16        List<Row> rows = new ArrayList<Row>();
17        rows.add(RowFactory.create(1, "John"));
18        rows.add(RowFactory.create(2, "Doe"));
19
20        Dataset<Row> df = spark.createDataFrame(rows, schema);
21
22        df.write()
23                .mode(SaveMode.Append)
24                .jdbc(jdbcUrl, "my_table", jdbcProperties);
25        // Show the DataFrame
26        df.show();
27
28        // Stop the Spark session
29        spark.stop();
30    }