BROKER-LOAD
BROKER LOAD
Description
该命令主要用于通过 Broker 服务进程来导入远端存储(如BOS、HDFS)上的数据。
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH BROKER broker_name
[broker_properties]
[load_properties];
-
load_label
每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。
[database.]label_name
-
data_desc1
用于描述一组需要导入的文件。
[MERGE|APPEND|DELETE] DATA INFILE ( "file_path1"[, file_path2, ...] ) [NEGATIVE] INTO TABLE `table_name` [PARTITION (p1, p2, ...)] [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] [(column_list)] [COLUMNS FROM PATH AS (c1, c2, ...)] [PRECEDING FILTER predicate] [SET (column_mapping)] [WHERE predicate] [DELETE ON expr] [ORDER BY source_sequence]
-
[MERGE|APPEND|DELETE]
数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合
[DELETE ON]
语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。 -
DATA INFILE
指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。
-
NEGTIVE
该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。
-
PARTITION(p1, p2, ...)
可以指定仅导入表的某些分区。不再分区范围内的数据将被忽略。
-
COLUMNS TERMINATED BY
指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。
-
FORMAT AS
指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。
-
column list
用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
(k1, k2, tmpk1)
-
COLUMNS FROM PATH AS
指定从导入文件路径中抽取的列。
-
PRECEDING FILTER predicate
前置过滤条件。数据首先根据
column list
和COLUMNS FROM PATH AS
按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。 -
SET (column_mapping)
指定列的转换函数。
-
WHERE predicate
根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
-
DELETE ON expr
需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。
-
ORDER BY
仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。
-
-
WITH BROKER broker_name
指定需要使用的 Broker 服务名称。在公有云 PALO 中。Broker 服务名称为
bos
-
broker_properties
指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 Broker 文档。
( "key1" = "val1", "key2" = "val2", ... )
-
load_properties
指定导入的相关参数。目前支持以下参数:
-
timeout
导入超时时间。默认为 4 小时。单位秒。
-
max_filter_ratio
最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。
-
exec_mem_limit
导入内存限制。默认为 2GB。单位为字节。
-
strict_mode
是否对数据进行严格限制。默认为 false。
-
timezone
指定某些受时区影响的函数的时区,如
strftime/alignment_timestamp/from_unixtime
等等,具体请查阅 时区 文档。如果不指定,则使用 "Asia/Shanghai" 时区。
-
Example
-
从 BOS 导入一批数据
LOAD LABEL example_db.label1 ( DATA INFILE("bos://my_bucket/input/file.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
导入文件
file.txt
,按逗号分隔,导入到表my_table
。 -
从 BOS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中。
LOAD LABEL example_db.label2 ( DATA INFILE("bos://my_bucket/input/file-10*") INTO TABLE `my_table1` PARTITION (p1) COLUMNS TERMINATED BY "," (k1, tmp_k2, tmp_k3) SET ( k2 = tmp_k2 + 1, k3 = tmp_k3 + 1 ) DATA INFILE("bos://my_bucket/input/file-20*") INTO TABLE `my_table2` COLUMNS TERMINATED BY "," (k1, k2, k3) ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
使用通配符匹配导入两批文件
file-10*
和file-20*
。分别导入到my_table1
和my_table2
两张表中。其中my_table1
指定导入到分区p1
中,并且将导入源文件中第二列和第三列的值 +1 后导入。 -
从 HDFS 导入一批数据。
LOAD LABEL example_db.label3 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*") INTO TABLE `my_table` COLUMNS TERMINATED BY "\\x01" ) WITH BROKER my_hdfs_broker ( "username" = "", "password" = "", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );
指定分隔符为 Hive 的默认分隔符
\\x01
,并使用通配符 * 指定data
目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。 -
导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断
LOAD LABEL example_db.label4 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` FORMAT AS "parquet" (k1, k2, k3) ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
-
导入数据,并提取文件路径中的分区字段
LOAD LABEL example_db.label10 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") INTO TABLE `my_table` FORMAT AS "csv" (k1, k2, k3) COLUMNS FROM PATH AS (city, utc_date) ) WITH BROKER hdfs ( "username"="hdfs_user", "password"="hdfs_password" );
my_table
表中的列为k1, k2, k3, city, utc_date
。其中
hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing
目录下包括如下文件:hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
文件中只包含
k1, k2, k3
三列数据,city, utc_date
这两列数据会从文件路径中提取。 -
对待导入数据进行过滤。
LOAD LABEL example_db.label6 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` (k1, k2, k3) PRECEDING FILTER k1 = 1 SET ( k2 = k2 + 1 ) WHERE k1 > k2 ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。
-
导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)
LOAD LABEL example_db.label7 ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") INTO TABLE `tbl12` COLUMNS TERMINATED BY "," (k2,k3) COLUMNS FROM PATH AS (data_time) SET ( data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') ) ) WITH BROKER hdfs ( "username"="user", "password"="pass" );
路径下有如下文件:
/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
表结构为:
data_time DATETIME, k2 INT, k3 INT
-
从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入
LOAD LABEL example_db.label8 ( MERGE DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` (k1, k2, k3, v2, v1) DELETE ON v2 > 100 ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );
使用 MERGE 方式导入。
my_table
必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。
-
导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序:
LOAD LABEL example_db.label9 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` COLUMNS TERMINATED BY "," (k1,k2,source_sequence,v1,v2) ORDER BY source_sequence ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" )
my_table
必须是是 Unqiue Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中source_sequence
列的值来保证顺序性。
Keywords
BROKER, LOAD
典型实践
-
查看导入任务状态
Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。
-
取消导入任务
已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。
-
Label、导入事务、多表原子性
PALO 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,PALO 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。
-
列映射、衍生列和过滤
PALO 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。
-
错误数据过滤
PALO 的导入任务可以容忍一部分格式错误的数据。容忍了通过
max_filter_ratio
设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,PALO 会自动跳过哪些数据格式不正确的行。关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。
-
严格模式
strict_mode
属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。 -
超时时间
Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。
-
数据量和任务数限制
Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。
同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。
PALO 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。