BROKER LOAD
描述
Broker Load 是 PALO 的数据导入方式,主要用于从远程存储系统(如 HDFS 或 S3)导入大规模数据。它通过 MySQL API 发起,是异步导入方式。导入进度和结果可以通过 SHOW LOAD 查询。
在早期版本中,S3 和 HDFS Load 依赖于 Broker 进程,但随着版本优化,现在直接从数据源读取,不再依赖额外的 Broker 进程。尽管如此,由于语法相似,S3 Load、HDFS Load 和 Broker Load 都被统称为 Broker Load。
语法
1LOAD LABEL [<db_name>.]<load_label>
2(
3[ { MERGE | APPEND | DELETE } ]
4DATA INFILE
5(
6"<file_path>"[, ...]
7)
8[ NEGATIVE ]
9INTO TABLE `<table_name>`
10[ PARTITION ( <partition_name> [ , ... ] ) ]
11[ COLUMNS TERMINATED BY "<column_separator>" ]
12[ LINES TERMINATED BY "<line_delimiter>" ]
13[ FORMAT AS "<file_type>" ]
14[ COMPRESS_TYPE AS "<compress_type>" ]
15[ (<column_list>) ]
16[ COLUMNS FROM PATH AS (<column_name> [ , ... ] ) ]
17[ SET (<column_mapping>) ]
18[ PRECEDING FILTER <predicate> ]
19[ WHERE <predicate> ]
20[ DELETE ON <expr> ]
21[ ORDER BY <source_sequence> ]
22[ PROPERTIES ("<key>"="<value>" [, ...] ) ]
23)
24WITH BROKER "<broker_name>"
25( <broker_properties>
26 [ , ... ])
27[ PROPERTIES (
28 <load_properties>
29 [ , ... ]) ]
30[COMMENT "<comment>" ];
必选参数
1. <db_name>
指定导入的数据库名。
2. <load_label>
每个导入任务需要指定一个唯一的 Label,后续可以通过该 Label 查询作业进度。
3. <table_name>
指定导入任务对应的表。
4.<file_path>
指定需要导入的文件路径。可以是多个路径,也可以使用通配符。路径最终必须匹配到文件,若只匹配到目录则导入会失败。
5. <broker_name>
指定需要使用的 Broker 服务名称。比如在公有云 PALO 中。Broker 服务名称为
bos。
6. <broker_properties>
指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。
Text1 ( 2 "username" = "user", 3 "password" = "pass", 4 ... 5 )
可选参数
1. merge | append | delete
数据合并类型。默认为
append,表示本次导入是普通的追加写操作。merge和delete类型仅适用于 unique key 模型表。merge类型需要配合[delete on]语句使用,以标注 delete flag 列。而delete类型则表示本次导入的所有数据皆为删除数据。
2. negative
表示 "负" 导入,这种方式仅针对具有整型 sum 聚合类型的聚合数据表。将导入数据中的 sum 聚合列对应的整型数值取反,用于冲抵错误数据。
3. <partition_name>
指定仅导入表的某些分区,比如:partition (p1, p2,...),其他不在分区范围内的数据会被忽略。
4. <column_separator>
指定列分隔符,仅在 CSV 格式下有效,且只能指定单字节分隔符。
5. <line_delimiter>
指定行分隔符,仅在 CSV 格式下有效,且只能指定单字节分隔符。
6. <file_type>
指定文件格式,支持
csv(默认)、parquet、orc格式。
7. <compress_type>
指定文件压缩类型,支持
gz、bz2、lz4frame。
8. <column_list>
指定原始文件中的列顺序。
9. columns from path as (<c1>, <c2>,...)
指定从导入文件路径中抽取的列。
10. <column_mapping>
指定列的转换函数。
11. preceding filter <predicate>
数据先根据
column list和columns from path as拼接为原始数据行,再根据前置过滤条件进行过滤。
12. where <predicate>
根据条件对导入数据进行过滤。
13. delete on <expr>
配合
merge导入模式使用,仅适用于 unique key 模型的表。用于指定导入数据中表示删除标志(delete flag)的列及计算关系。
14. <source_sequence>
仅适用于 unique key 模型的表。用于指定导入数据中表示 sequence col 的列,主要用于导入时保证数据顺序。
15. properties ("<key>"="<value>",...)
指定导入文件格式的参数。适用于 CSV、JSON 等格式。例如,可以指定
json_root、jsonpaths、fuzzy_parse等参数。
enclose: 包围符;当 CSV 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。
注意:当enclose设置为"时,trim_double_quotes一定要设置为true。
escape: 转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为 "a,'b,'c'",包围符为 "'",希望 "b,'c" 被作为一个字段解析,则需要指定单字节转义符,例如 "",然后将数据修改为 "a,'b,'c'"。
16. < load_properties > 可选参数如下,并可根据实际环境情况添加。
| 参数 | 参数说明 |
|---|---|
| timeout | 导入超时时间,默认为 4 小时,单位秒。 |
| max_filter_ratio | 最大容忍可过滤(数据不规范等原因)的数据比例,默认零容忍,取值范围为 0 到 1。 |
| exec_mem_limit | 导入内存限制,默认为 2GB,单位为字节。 |
| strict_mode | 是否对数据进行严格限制,默认为 false。 |
| partial_columns | 布尔类型,为 true 时表示使用部分列更新,默认值为 false,仅在表模型为 Unique 且采用 Merge on Write 时设置。 |
| timezone | 指定时区,影响一些受时区影响的函数,如 strftime、alignment_timestamp、from_unixtime 等。 |
| load_parallelism | 导入并发度,默认为 1,调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。 |
| send_batch_parallelism | 设置发送批处理数据的并行度。如果并行度的值超过 BE 配置中的 max_send_batch_parallelism_per_job,则会使用 max_send_batch_parallelism_per_job 的值。 |
| load_to_single_tablet | 布尔类型,为 true 时表示支持将数据导入到对应分区的单个 tablet,默认值为 false,作业的任务数取决于整体并发度,仅在导入带有 random 分桶的 OLAP 表时设置。 |
| priority | 设置导入任务的优先级,可选 HIGH/NORMAL/LOW,默认为 NORMAL。对于处于 PENDING 状态的导入任务,更高优先级的任务将优先进入 LOADING 状态。 |
| comment | 指定导入任务的备注信息。 |
权限控制
执行此 SQL 命令的用户必须至少具有以下权限:
| 权限(Privilege) | 对象(Object) | 说明(Notes) |
|---|---|---|
| LOAD_PRIV | 表(Table) | 对指定的库表的导入权限 |
举例
-
从 HDFS 导入一批数据,导入文件
file.txt,按逗号分隔,导入到表my_table。SQL1LOAD LABEL example_db.label1 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "," 6) 7WITH BROKER hdfs 8( 9 "username"="hdfs_user", 10 "password"="hdfs_password" 11); -
从 HDFS 导入数据,使用通配符匹配两批文件。分别导入到两个表中。使用通配符匹配导入两批文件
file-10*和file-20*。分别导入到my_table1和my_table2两张表中。其中my_table1指定导入到分区p1中,并且将导入源文件中第列和第三列的值 +1 后导入。SQL1LOAD LABEL example_db.label2 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*") 4 INTO TABLE `my_table1` 5 PARTITION (p1) 6 COLUMNS TERMINATED BY "," 7 (k1, tmp_k2, tmp_k3) 8 SET ( 9 k2 = tmp_k2 + 1, 10 k3 = tmp_k3 + 1 11 ), 12 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*") 13 INTO TABLE `my_table2` 14 COLUMNS TERMINATED BY "," 15 (k1, k2, k3) 16) 17WITH BROKER hdfs 18( 19 "username"="hdfs_user", 20 "password"="hdfs_password" 21); -
从 HDFS 导入一批数据。指定分隔符为 Hive 的默认分隔符
\\x01,并使用通配符 * 指定data目录下所有目录的所文件。使用简单认证,同时配置 namenode HA。SQL1LOAD LABEL example_db.label3 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "\\x01" 6) 7WITH BROKER my_hdfs_broker 8( 9 "username" = "", 10 "password" = "", 11 "fs.defaultFS" = "hdfs://my_ha", 12 "dfs.nameservices" = "my_ha", 13 "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", 14 "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", 15 "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", 16 "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server. namenode.ha.ConfiguredFailoverProxyProvider" 17); -
导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断
SQL1LOAD LABEL example_db.label4 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file") 4 INTO TABLE `my_table` 5 FORMAT AS "parquet" 6 (k1, k2, k3) 7) 8WITH BROKER hdfs 9( 10 "username"="hdfs_user", 11 "password"="hdfs_password" 12); -
导入数据,并提取文件路径中的分区字段。
my_table表中的列为k1, k2, k3, city, utc_date。其中hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing目下包括如下文件:Text1hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv 2hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv 3hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv 4hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv文件中只包含
k1, k2, k3三列数据,city, utc_date这两列数据会从文件路径中提取。SQL1LOAD LABEL example_db.label10 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") 4 INTO TABLE `my_table` 5 FORMAT AS "csv" 6 (k1, k2, k3) 7 COLUMNS FROM PATH AS (city, utc_date) 8) 9WITH BROKER hdfs 10( 11 "username"="hdfs_user", 12 "password"="hdfs_password" 13); -
对待导入数据进行过滤。只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。
SQL1LOAD LABEL example_db.label6 2( 3 DATA INFILE("hdfs://host:port/input/file") 4 INTO TABLE `my_table` 5 (k1, k2, k3) 6 SET ( 7 k2 = k2 + 1 8 ) 9 PRECEDING FILTER k1 = 1 10 WHERE k1 > k2 11) 12WITH BROKER hdfs 13( 14 "username"="user", 15 "password"="pass" 16); -
导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)
SQL1LOAD LABEL example_db.label7 2( 3 DATA INFILE("hdfs://host:port/user/data/*/test.txt") 4 INTO TABLE `tbl12` 5 COLUMNS TERMINATED BY "," 6 (k2,k3) 7 COLUMNS FROM PATH AS (data_time) 8 SET ( 9 data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') 10 ) 11) 12WITH BROKER hdfs 13( 14 "username"="user", 15 "password"="pass" 16);路径下有如下文件:
Text1/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt 2/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt表结构为:
Text1data_time DATETIME, 2k2 INT, 3k3 INT -
从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中 v2 大于 100 的列相匹配的列删除,其他列正常导入
SQL1LOAD LABEL example_db.label8 2( 3 MERGE DATA INFILE("HDFS://test:802/input/file") 4 INTO TABLE `my_table` 5 (k1, k2, k3, v2, v1) 6 DELETE ON v2 > 100 7) 8WITH HDFS 9( 10 "hadoop.username"="user", 11 "password"="pass" 12) 13PROPERTIES 14( 15 "timeout" = "3600", 16 "max_filter_ratio" = "0.1" 17);使用 MERGE 方式导入。
my_table必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。
-
导入时指定 source_sequence 列,保证 UNIQUE_KEYS 表中的替换顺序:
SQL1LOAD LABEL example_db.label9 2( 3 DATA INFILE("HDFS://test:802/input/file") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "," 6 (k1,k2,source_sequence,v1,v2) 7 ORDER BY source_sequence 8) 9WITH HDFS 10( 11 "hadoop.username"="user", 12 "password"="pass" 13)my_table必须是 Unique Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中source_sequence列的值来保证顺序性。 -
从 HDFS 导入一批数据,指定文件格式为
json并指定json_root、jsonpathsSQL1LOAD LABEL example_db.label10 2( 3 DATA INFILE("HDFS://test:port/input/file.json") 4 INTO TABLE `my_table` 5 FORMAT AS "json" 6 PROPERTIES( 7 "json_root" = "$.item", 8 "jsonpaths" = "[$.id, $.city, $.code]" 9 ) 10) 11with HDFS ( 12"hadoop.username" = "user" 13"password" = "" 14) 15PROPERTIES 16( 17"timeout"="1200", 18"max_filter_ratio"="0.1" 19);jsonpaths可与column list及SET (column_mapping)配合:SQL1LOAD LABEL example_db.label10 2( 3 DATA INFILE("HDFS://test:port/input/file.json") 4 INTO TABLE `my_table` 5 FORMAT AS "json" 6 (id, code, city) 7 SET (id = id * 10) 8 PROPERTIES( 9 "json_root" = "$.item", 10 "jsonpaths" = "[$.id, $.code, $.city]" 11 ) 12) 13with HDFS ( 14"hadoop.username" = "user" 15"password" = "" 16) 17PROPERTIES 18( 19"timeout"="1200", 20"max_filter_ratio"="0.1" 21); -
从腾讯云 cos 中以 csv 格式导入数据。
SQL1LOAD LABEL example_db.label10 2( 3DATA INFILE("cosn://my_bucket/input/file.csv") 4INTO TABLE `my_table` 5(k1, k2, k3) 6) 7WITH BROKER "broker_name" 8( 9 "fs.cosn.userinfo.secretId" = "xxx", 10 "fs.cosn.userinfo.secretKey" = "xxxx", 11 "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com" 12) -
导入 CSV 数据时去掉双引号,并跳过前 5 行。
SQL1LOAD LABEL example_db.label12 2( 3DATA INFILE("cosn://my_bucket/input/file.csv") 4INTO TABLE `my_table` 5(k1, k2, k3) 6PROPERTIES("trim_double_quotes" = "true", "skip_lines" = "5") 7) 8WITH BROKER "broker_name" 9( 10 "fs.cosn.userinfo.secretId" = "xxx", 11 "fs.cosn.userinfo.secretKey" = "xxxx", 12 "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com" 13)
