BROKER-LOAD
BROKER LOAD
Description
该命令主要用于通过 Broker 服务进程来导入远端存储(如BOS、HDFS)上的数据。
1LOAD LABEL load_label
2(
3data_desc1[, data_desc2, ...]
4)
5WITH BROKER broker_name
6[broker_properties]
7[load_properties];
-
load_label
每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。
[database.]label_name
-
data_desc1
用于描述一组需要导入的文件。
SQL1[MERGE|APPEND|DELETE] 2DATA INFILE 3( 4"file_path1"[, file_path2, ...] 5) 6[NEGATIVE] 7INTO TABLE `table_name` 8[PARTITION (p1, p2, ...)] 9[COLUMNS TERMINATED BY "column_separator"] 10[FORMAT AS "file_type"] 11[(column_list)] 12[COLUMNS FROM PATH AS (c1, c2, ...)] 13[PRECEDING FILTER predicate] 14[SET (column_mapping)] 15[WHERE predicate] 16[DELETE ON expr] 17[ORDER BY source_sequence]
-
[MERGE|APPEND|DELETE]
数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合
[DELETE ON]
语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。 -
DATA INFILE
指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。
-
NEGTIVE
该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。
-
PARTITION(p1, p2, ...)
可以指定仅导入表的某些分区。不再分区范围内的数据将被忽略。
-
COLUMNS TERMINATED BY
指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。
-
FORMAT AS
指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。
-
column list
用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
(k1, k2, tmpk1)
-
COLUMNS FROM PATH AS
指定从导入文件路径中抽取的列。
-
PRECEDING FILTER predicate
前置过滤条件。数据首先根据
column list
和COLUMNS FROM PATH AS
按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。 -
SET (column_mapping)
指定列的转换函数。
-
WHERE predicate
根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
-
DELETE ON expr
需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。
-
ORDER BY
仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。
-
-
WITH BROKER broker_name
指定需要使用的 Broker 服务名称。在公有云 PALO 中。Broker 服务名称为
bos
-
broker_properties
指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 Broker 文档。
Plain Text1( 2 "key1" = "val1", 3 "key2" = "val2", 4 ... 5)
-
load_properties
指定导入的相关参数。目前支持以下参数:
-
timeout
导入超时时间。默认为 4 小时。单位秒。
-
max_filter_ratio
最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。
-
exec_mem_limit
导入内存限制。默认为 2GB。单位为字节。
-
strict_mode
是否对数据进行严格限制。默认为 false。
-
timezone
指定某些受时区影响的函数的时区,如
strftime/alignment_timestamp/from_unixtime
等等,具体请查阅 时区 文档。如果不指定,则使用 "Asia/Shanghai" 时区。
-
Example
-
从 BOS 导入一批数据
SQL1LOAD LABEL example_db.label1 2( 3 DATA INFILE("bos://my_bucket/input/file.txt") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "," 6) 7WITH BROKER bos 8( 9 "bos_endpoint" = "http://bj.bcebos.com", 10 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 11 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 12);
导入文件
file.txt
,按逗号分隔,导入到表my_table
。 -
从 BOS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中。
SQL1LOAD LABEL example_db.label2 2( 3 DATA INFILE("bos://my_bucket/input/file-10*") 4 INTO TABLE `my_table1` 5 PARTITION (p1) 6 COLUMNS TERMINATED BY "," 7 (k1, tmp_k2, tmp_k3) 8 SET ( 9 k2 = tmp_k2 + 1, 10 k3 = tmp_k3 + 1 11 ) 12 DATA INFILE("bos://my_bucket/input/file-20*") 13 INTO TABLE `my_table2` 14 COLUMNS TERMINATED BY "," 15 (k1, k2, k3) 16) 17WITH BROKER bos 18( 19 "bos_endpoint" = "http://bj.bcebos.com", 20 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 21 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 22);
使用通配符匹配导入两批文件
file-10*
和file-20*
。分别导入到my_table1
和my_table2
两张表中。其中my_table1
指定导入到分区p1
中,并且将导入源文件中第二列和第三列的值 +1 后导入。 -
从 HDFS 导入一批数据。
SQL1LOAD LABEL example_db.label3 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "\\x01" 6) 7WITH BROKER my_hdfs_broker 8( 9 "username" = "", 10 "password" = "", 11 "dfs.nameservices" = "my_ha", 12 "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", 13 "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", 14 "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", 15 "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" 16);
指定分隔符为 Hive 的默认分隔符
\\x01
,并使用通配符 * 指定data
目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。 -
导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断
SQL1LOAD LABEL example_db.label4 2( 3 DATA INFILE("bos://bucket/input/file") 4 INTO TABLE `my_table` 5 FORMAT AS "parquet" 6 (k1, k2, k3) 7) 8WITH BROKER bos 9( 10 "bos_endpoint" = "http://bj.bcebos.com", 11 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 12 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 13);
-
导入数据,并提取文件路径中的分区字段
SQL1LOAD LABEL example_db.label10 2( 3 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") 4 INTO TABLE `my_table` 5 FORMAT AS "csv" 6 (k1, k2, k3) 7 COLUMNS FROM PATH AS (city, utc_date) 8) 9WITH BROKER hdfs 10( 11 "username"="hdfs_user", 12 "password"="hdfs_password" 13);
my_table
表中的列为k1, k2, k3, city, utc_date
。其中
hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing
目录下包括如下文件:Plain Text1hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv 2hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv 3hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv 4hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
文件中只包含
k1, k2, k3
三列数据,city, utc_date
这两列数据会从文件路径中提取。 -
对待导入数据进行过滤。
SQL1LOAD LABEL example_db.label6 2( 3 DATA INFILE("bos://bucket/input/file") 4 INTO TABLE `my_table` 5 (k1, k2, k3) 6 PRECEDING FILTER k1 = 1 7 SET ( 8 k2 = k2 + 1 9 ) 10 WHERE k1 > k2 11) 12WITH BROKER bos 13( 14 "bos_endpoint" = "http://bj.bcebos.com", 15 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 16 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 17);
只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。
-
导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)
SQL1LOAD LABEL example_db.label7 2( 3 DATA INFILE("hdfs://host:port/user/data/*/test.txt") 4 INTO TABLE `tbl12` 5 COLUMNS TERMINATED BY "," 6 (k2,k3) 7 COLUMNS FROM PATH AS (data_time) 8 SET ( 9 data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') 10 ) 11) 12WITH BROKER hdfs 13( 14 "username"="user", 15 "password"="pass" 16);
路径下有如下文件:
Plain Text1/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt 2/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
表结构为:
Plain Text1data_time DATETIME, 2k2 INT, 3k3 INT
-
从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入
SQL1LOAD LABEL example_db.label8 2( 3 MERGE DATA INFILE("bos://bucket/input/file") 4 INTO TABLE `my_table` 5 (k1, k2, k3, v2, v1) 6 DELETE ON v2 > 100 7) 8WITH BROKER bos 9( 10 "bos_endpoint" = "http://bj.bcebos.com", 11 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 12 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 13) 14PROPERTIES 15( 16 "timeout" = "3600", 17 "max_filter_ratio" = "0.1" 18);
使用 MERGE 方式导入。
my_table
必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。
-
导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序:
SQL1LOAD LABEL example_db.label9 2( 3 DATA INFILE("bos://bucket/input/file") 4 INTO TABLE `my_table` 5 COLUMNS TERMINATED BY "," 6 (k1,k2,source_sequence,v1,v2) 7 ORDER BY source_sequence 8) 9WITH BROKER bos 10( 11 "bos_endpoint" = "http://bj.bcebos.com", 12 "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", 13 "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" 14)
my_table
必须是是 Unqiue Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中source_sequence
列的值来保证顺序性。
Keywords
BROKER, LOAD
典型实践
-
查看导入任务状态
Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。
-
取消导入任务
已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。
-
Label、导入事务、多表原子性
PALO 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,PALO 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。
-
列映射、衍生列和过滤
PALO 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。
-
错误数据过滤
PALO 的导入任务可以容忍一部分格式错误的数据。容忍了通过
max_filter_ratio
设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,PALO 会自动跳过哪些数据格式不正确的行。关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。
-
严格模式
strict_mode
属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。 -
超时时间
Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。
-
数据量和任务数限制
Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。
同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。
PALO 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。