Doris集群间数据迁移
Doris集群之间进行数据迁移主要有备份恢复、导出导入以及Catalog 3种方式。在实际业务场景中,可以根据几种迁移方式的优缺点,选择最优方式进行数据迁移。
迁移方式比较
数据迁移方式 | 优点 | 缺点 |
---|---|---|
备份恢复 | 1. 数据迁移速率快 | 1. 要求doris集群之间的版本保持一致; 2. 依赖第三方存储; 3. 备份恢复时如果库级别数据量较大(T以上)会偶发失败,需要调整为表级别或分区级别备份恢复 |
导出导入 | 1. 数据迁移速率较快; 2. doris集群之间迁移数据时,没有版本要求; 3. 支持指定过滤条件 |
1. 依赖第三方存储 |
Catalog | 1. 数据迁移速率较快; 2. doris集群之间迁移数据时,没有版本要求; 3. 支持指定过滤条件 |
1. 要求打通源集群、目标集群之间的网络 |
数据迁移
备份恢复
方案简介
Doris 支持将当前数据以文件的形式,备份到远端存储系统中。之后可以通过恢复命令,从远端存储系统中将数据恢复到Doris 集群。
备份操作:Doris会为需要备份的表创建一个快照,包含了该表的所有数据和元数据,之后将表的数据和元数据打包成一个snapshot文件,传输到用户指定的存储位置(如HDFS)。
恢复操作:从HDFS上下载snapshot文件,然后解压缩这个文件,将数据和元数据恢复到Doris系统中的相应表中。
迁移流程
源集群
- 创建远程仓库(任选一即可)
通过broker创建hdfs仓库
1CREATE REPOSITORY `example_repo`
2WITH BROKER `hdfs_broker`
3ON LOCATION "hdfs://hadoop-name-node:54310/path/to/repo/"
4PROPERTIES
5(
6 "username" = "user",
7 "password" = "password"
8);
9
10# 直接创建支持s3协议的远程仓库
11CREATE REPOSITORY `s3_repo`
12WITH S3
13ON LOCATION "s3://bucket_name/test"
14PROPERTIES
15(
16 "AWS_ENDPOINT" = "http://xxxx.xxxx.com",
17 "AWS_ACCESS_KEY" = "xxxx",
18 "AWS_SECRET_KEY"="xxx",
19 "AWS_REGION" = "xxx"
20);
21
22# 通过broker创建bos仓库
23CREATE REPOSITORY `bos_repo`
24WITH BROKER `bos_broker`
25ON LOCATION "bos://palo_backup"
26PROPERTIES
27(
28 "bos_endpoint" = "http://gz.bcebos.com",
29 "bos_accesskey" = "bos_accesskey",
30 "bos_secret_accesskey"="bos_secret_accesskey"
31);
- 备份数据 备份数据库example_db下表 example_tbl的 p1 分区到远程仓库 example_repo 中,snapshot_label2为快照名称(注意修改库表名,仓库名与上述对应即可) 支持库级别、表级别、分区级别备份,如果数据量较大,备份任务可能会失败,缩小备份范围重试即可。
1# 分区级别
2BACKUP SNAPSHOT example_db.snapshot_label2
3TO example_repo
4ON
5(
6 example_tbl PARTITION (p1)
7);
8
9# 表级别
10BACKUP SNAPSHOT example_db.snapshot_label2
11TO example_repo
12ON
13(
14 example_tbl
15);
16
17# 库级别
18BACKUP SNAPSHOT example_db.snapshot_label3
19TO example_repo;
20
21# 如果备份异常(时间过长等),可以cancel备份任务
22CANCEL BACKUP FROM example_db;
- 查看 backup 执行情况 State: FINISHED、Status: [OK]表示备份任务完成。如下所示:
1mysql> show BACKUP\G;
2*************************** 1. row ***************************
3 JobId: 17891847
4 SnapshotName: snapshot_label1
5 DbName: example_db
6 State: FINISHED
7 BackupObjs: [default_cluster:example_db.example_tbl]
8 CreateTime: 2022-04-08 15:52:29
9SnapshotFinishedTime: 2022-04-08 15:52:32
10 UploadFinishedTime: 2022-04-08 15:52:38
11 FinishedTime: 2022-04-08 15:52:44
12 UnfinishedTasks:
13 Progress:
14 TaskErrMsg:
15 Status: [OK]
16 Timeout: 86400
171 row in set (0.01 sec)
- 查看远程仓库内容,确认是否备份成功(执行恢复时也会用到)
1# 查看远程仓库内容
2mysql> SHOW SNAPSHOT ON example_repo;
3+-----------------+---------------------+--------+
4| Snapshot | Timestamp | Status |
5+-----------------+---------------------+--------+
6| snapshot_label1 | 2022-04-08-15-52-29 | OK |
7+-----------------+---------------------+--------+
81 row in set (0.15 sec)
目标集群
- 创建远程仓库(同上)
- 恢复数据
从 example_repo 中恢复快照 snapshot_label1 中表 backup_tbl 的分区 p1到数据库 example_db1,时间版本与上述SHOW SNAPSHOT保持一致(注意库表与实际保持一致)。默认恢复为 3 个副本,可以在PROPERTIES中指定副本数
1# 查看远程仓库内容
2mysql> SHOW SNAPSHOT ON example_repo;
3+-----------------+---------------------+--------+
4| Snapshot | Timestamp | Status |
5+-----------------+---------------------+--------+
6| snapshot_label1 | 2022-04-08-15-52-29 | OK |
7+-----------------+---------------------+--------+
8
9
10RESTORE SNAPSHOT example_db1.snapshot_label1
11FROM `example_repo`
12ON
13(
14 `backup_tbl` PARTITION (p1),
15)
16PROPERTIES
17(
18 "backup_timestamp"="2022-04-08-15-52-29",
19 ["replication_num" = "n"]
20);
- 查看 restore 执行情况
restore成功后执行查询验证,如下表示恢复完成。
1mysql> SHOW RESTORE\G;
2*************************** 1. row ***************************
3 JobId: 17891851
4 Label: snapshot_label1
5 Timestamp: 2022-04-08-15-52-29
6 DbName: default_cluster:example_db1
7 State: FINISHED
8 AllowLoad: false
9 ReplicationNum: 3
10 RestoreObjs: {
11 "name": "snapshot_label1",
12 "database": "example_db",
13 "backup_time": 1649404349050,
14 "content": "ALL",
15 "olap_table_list": [
16 {
17 "name": "backup_tbl",
18 "partition_names": [
19 "p1",
20 "p2"
21 ]
22 }
23 ],
24 "view_list": [],
25 "odbc_table_list": [],
26 "odbc_resource_list": []
27}
28 CreateTime: 2022-04-08 15:59:01
29 MetaPreparedTime: 2022-04-08 15:59:02
30SnapshotFinishedTime: 2022-04-08 15:59:05
31DownloadFinishedTime: 2022-04-08 15:59:12
32 FinishedTime: 2022-04-08 15:59:18
33 UnfinishedTasks:
34 Progress:
35 TaskErrMsg:
36 Status: [OK]
37 Timeout: 86400
381 row in set (0.01 sec)
导出导入
方案简介
Doris提供了导入导出相关功能,通过export或者select into outfile将数据从源集群导出到第三方存储,之后再通过broker load将数据导入到目标集群,即可完成数据的跨集群迁移。 对于数据导出,export仅支持简单的过滤条件,而select into outfile支持复杂计算逻辑的,如过滤、聚合、关联等,本文数据导出方式只介绍select into outfile方式。
迁移流程
数据导出
导出命令的超时时间同查询的超时时间。如果导出超时失败,可以通过 SET query_timeout=xxx 进行设置。select into outfile为同步导入,sql返回成功,即表示数据导出成功。
1SELECT * FROM dm_platform_golden2_browse_busi_opp_prepay_ymd
2[WHERE k=xxx]
3INTO OUTFILE "s3://hyj-repo/outfile/dm_platform_golden2_browse_busi_opp_prepay_ymd/"
4FORMAT AS CSV
5PROPERTIES(
6 "s3.endpoint" = "https://s3.bj.bcebos.com",
7 "s3.region" = "bj",
8 "s3.secret_key"="xxxxxx",
9 "s3.access_key" = "xxxxxx"
10);
11
12
13# 导出到hdfs
14SELECT * FROM tbl
15INTO OUTFILE "hdfs://${host}:${fileSystem_port}/path/to/result_"
16FORMAT AS CSV
17PROPERTIES
18(
19 "fs.defaultFS" = "hdfs://ip:port",
20 "hadoop.username" = "work"
21);
数据导入
- 从第三方存储导入数据
支持表级别和分区级别。
1# 从bos导入数据,表级别
2LOAD LABEL broker_load_2025_02_07_16_22
3(
4 DATA INFILE("s3://hyj-repo/export/dwd_ctob_appoint_task/*")
5 INTO TABLE tbl
6 [COLUMNS TERMINATED BY ","] // 默认为\t
7 FORMAT AS "CSV"
8)
9WITH S3
10(
11 "provider" = "S3",
12 "AWS_ENDPOINT" = "https://s3.bj.bcebos.com",
13 "AWS_ACCESS_KEY" = "xxxxxx",
14 "AWS_SECRET_KEY"="xxxxxx",
15 "AWS_REGION" = "bj"
16)
17PROPERTIES
18(
19 "timeout" = "3600",
20 ["max_filter_ratio" = "0.9"] // 导入任务的最大容忍率,默认为0容忍,取值范围是0~1
21);
22
23# 从hdfs导入数据,分区级别
24LOAD LABEL example_db.label2
25(
26 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
27 INTO TABLE tbl
28 PARTITION (p1)
29 COLUMNS TERMINATED BY ","
30 FORMAT AS "CSV"
31)
32WITH BROKER hdfs
33(
34 "username"="hdfs_user",
35 "password"="hdfs_password"
36);
- 查看导入进度
如下State: FINISHED,Progress: 100.00%,表示数据导入成功。
1## 查看导入状态
2show load where label="broker_load_2025_02_07_16_22"\G;
3*************************** 1. row ***************************
4 JobId: 2348413
5 Label: broker_load_2025_02_07_16_22
6 State: FINISHED
7 Progress: 100.00% (10/10)
8 Type: BROKER
9 EtlInfo: unselected.rows=0; dpp.norm.ALL=803557; dpp.abnorm.ALL=0
10 TaskInfo: cluster:s3.bj.bcebos.com; timeout(s):3600; max_filter_ratio:0.0
11 ErrorMsg: NULL
12 CreateTime: 2025-02-07 16:37:41
13 EtlStartTime: 2025-02-07 16:37:43
14 EtlFinishTime: 2025-02-07 16:37:43
15 LoadStartTime: 2025-02-07 16:37:43
16LoadFinishTime: 2025-02-07 16:37:57
17 URL: NULL
18 JobDetails: {"Unfinished backends":{"ed34d6e37b23487f-a610ddfbc2d02398":[]},"ScannedRows":803557,"TaskNumber":1,"LoadBytes":1138294531,"All backends":{"ed34d6e37b23487f-a610ddfbc2d02398":[49346,24470,24469,50081,13952]},"FileNumber":1,"FileSize":818910452}
19 TransactionId: 4606947531568128
20 ErrorTablets: {}
21 User: root
22 Comment:
- 撤销导入任务 如果想撤销导入任务,可以通过如下命令进行,之后通过show broker load确认是否cancel成功。
1## cancel导出任务
2cancel load where label="broker_load_2025_02_07_16_22";
Catalog
方案简介
创建源表的catalog后,直接通过insert into select将源表的数据同步到目标表。
迁移流程
创建catalog
- 创建calalog
1CREATE CATALOG doris PROPERTIES (
2 "type"="jdbc",
3 "user"="root",
4 "password"="xxxxxx",
5 "jdbc_url" = "jdbc:mysql://100.66.35.128:9030",
6 "driver_url" = "http://xxx.bj.bcebos.com/jdbc-drivers/mysql-connector-j-8.3.0.jar",
7 "driver_class" = "com.mysql.cj.jdbc.Driver"
8);
- 查看catalog
1## 查看 catalog
2SHOW CATALOGS;
3
4## 查看 catalog 下的数据库
5SHOW DATABASES FROM doris;
6
7## 查看 catalog.db 下的数据表
8SHOW TABLES FROM doris.db;
- 查询catalog
1## 查询 catalog.db.table 的数据
2SELECT * FROM doris.db.table;
同步数据
- 创建一张与源表tbl_orig相同结构的表tbl_dest
- 将catalog中的数据同步到doris中
1# 超时时间同查询的超时时间。可以通过 SET query_timeout=xxx 进行设置。
2insert into tbl_dest select * from doris.db.tbl_orig;
增量数据迁移说明
实际生产环境中,Doris数据主要分为离线数据和增量数据。对于增量数据的迁移,可以考虑以下两种方式:
- 在生产Doris源集群数据时并行写入一份数据到目标集群;
- 周期性作业读取Doris源集群中的分区数据,并写入目标集群。