从Flink导入
更新时间:2024-11-25
使用Flink ClickHouse 连接器进行导入。
表一 连接器选择
选项 | 默认 | 类型 | 描述 | |
---|---|---|---|---|
网址 | 必填 | none | String | 格式的 ClickHouse jdbc url clickhouse://<host>:<port> |
用户名 | 选填 | none | String | 如果指定了“用户名”和“密码”,则必须同时指定。 |
密码 | 选填 | none | String | ClickHouse 密码。 |
数据库名称 | 选填 | default | String | ClickHouse 数据库名称。 |
表名 | 必填 | none | String | ClickHouse 表名。 |
使用本地 | 选填 | false | Boolean | 在分布式表引擎的情况下直接读取/写入本地表。 |
sink.flush 间隔 | 选填 | 1000 | Integer | 最大刷新大小,超过此大小将会刷新数据。 |
sink.flush 间隔 | 选填 | 1s | Duration | 在此刷新间隔时间内,异步线程将刷新数据。 |
sink.max-重试次数 | 选填 | 3 | Integer | 将记录写入数据库失败时的最大重试次数。 |
sink.update 策略 | 选填 | update | String | 将 UPDATE_AFTER 类型的记录转换为更新/插入语句或者直接丢弃它,可用:更新、插入、丢弃。 |
sink.partition-策略 | 选填 | balanced | String | 分区策略:平衡(循环)、哈希(分区键)、随机(随机)。 |
接收器.分区键 | 选填 | none | String | 用于哈希策略的分区键。 |
接收器分片使用表定义 | 选填 | false | Boolean | 分片策略与分布式表定义一致,若设置为true,则会覆盖sink.partition-strategy 和sink.partition-key 的配置。 |
sink.ignore-删除 | 选填 | true | Integer | 是否忽略删除语句。 |
接收器并行性 | 选填 | none | String | 为接收器定义自定义并行性。 |
扫描.分区.列 | 选填 | none | Integer | 用于对输入进行分区的列名。 |
扫描分区号 | 选填 | none | Long | 分区的数量。 |
扫描分区下限 | 选填 | none | Long | 第一个分区的最小值。 |
是否忽略主键 | 选填 | true | Boolean | 使用 ClickHouseCatalog 创建表时是否忽略主键。 |
特性 | 选填 | none | String | 这可以设置并传递clickhouse-jdbc 配置。 |
查找缓存 | 选填 | none | String | 该查询表的缓存策略,包括NONE和PARTIAL(暂不支持FULL) |
查找部分缓存访问后过期 | 选填 | none | Duration | 访问后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。 |
查找部分缓存写入后过期 | 选填 | none | Duration | 写入后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。 |
查找缓存最大行数 | 选填 | none | Long | 查找缓存的最大行数,超过此值,最旧的行将会过期。 |
查找部分缓存缺少键 | 选填 | true | Boolean | 标记缓存丢失的密钥,默认为 true |
查找最大重试数 | 选填 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
更新/删除数据注意事项:
- 分布式表不支持更新/删除语句,如果要使用更新/删除报表,请确保将记录写入本地表或将use local设置为true。
- 数据由主键更新和删除,在分区表中使用时请注意这一点。
数据类型映射
Flink 类型 | ClickHouse 类型 |
---|---|
CHAR | String |
VARCHAR | String / IP / UUID |
STRING | String / Enum |
BOOLEAN | UInt8 |
BYTES | FixedString |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | Not supported |
MULTISET | Not supported |
RAW | Not supported |
Maven依赖关系
该项目未发布到maven中央存储库,在使用之前,需要部署/安装到自己的存储库,步骤如下:
# clone the project
git clone https://github.com/itinycheng/flink-connector-clickhouse.git
# enter the project directory
cd flink-connector-clickhouse/
# display remote branches
git branch -r
# checkout the branch you need
git checkout $branch_name
# install or deploy the project to our own repository
mvn clean install -DskipTests
mvn clean deploy -DskipTests
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
使用步骤
- 创建和读/写表:
-- register a clickhouse table `t_user` in flink sql.
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
`list` ARRAY<STRING>,
`map` Map<STRING, BIGINT>,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://{ip}:{port}',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
-- read data from clickhouse
SELECT user_id, user_type from t_user;
-- write data into the clickhouse table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;
- 创建和使用ClickHouseCatalog:
- Scala
val tEnv = TableEnvironment.create(setting)
val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
val cHcatalog = new ClickHouseCatalog("clickhouse", props)
tEnv.registerCatalog("clickhouse", cHcatalog)
tEnv.useCatalog("clickhouse")
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
- Java
TableEnvironment tEnv = TableEnvironment.create(setting);
Map<String, String> props = new HashMap<>();
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
tEnv.registerCatalog("clickhouse", cHcatalog);
tEnv.useCatalog("clickhouse");
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
- SQL
> CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'clickhouse://127.0.0.1:8123',
'username' = 'username',
'password' = 'password',
'database-name' = 'default',
'use-local' = 'false',
...
);
> USE CATALOG clickhouse;
> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
> INSERT INTO `default`.`t_user` SELECT ...;