STREAM-LOAD
STREAM LOAD
Description
Stream Load 通过 HTTP 协议向 PALO 传输并导入数据。该方式主要用户本地数据的上传的导入。但其本质上是一种导入框架,其提供的 HTTP 接口,不仅能够支持本地数据的传输,也能够支持内存数据、管道数据等向 HTTP 端口传输数据。
- 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
- 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。
本文档主要通过 cURL 命令来介绍 Stream Load 的使用方式
curl -XPUT --location-trusted -u user:passwd \
[-H "header1: xxx" -H "header2: xxx" ...] \
-T data.txt \
http://host:port/api/{db}/{table}/_stream_load
- HTTP 的请求方式为
PUT
- 当前支持 HTTP chunked 与非 chunked 上传两种方式。对于非 chunked 方式,Header 中必须包含
Content-Length
来标识上传内容的长度,以保证数据的完整性。 - Header 中建议包含
Expect Header: 100-continue
,这样可以在某些出错场景下避免不必要的数据传输。 -
命令的目标
host:port
有两种方式:- 指向 FE 的 HTTP 协议端口。这种方式,FE 会直接将请求做 307 转发到随机的一个 BE 节点。最终请求和数据直接和这个 BE 节点通讯。这种方式需要客户端和 BE 节点的网络能够正常通讯。
- 指向 BE 的 HTTP 协议端口。则请求直接和 BE 节点交互。
注:百度云 PALO 用户请直接连接 Compute Node 的 HTTP 协议端口即可。
- 在 URL 的
{db}
和{table}
两个 Path Parameter 中指定要导入的数据库和表。 -
导入任务的其他参数均在 Header 中指定:
-
label
为导入任务指定一个 Label,用于唯一标示这个作业。如果不指定,则系统会自动生成一个 UUID 作为 Label。
-H "label: my_label1"
-
column_separator
用于指定导入文件中的列分隔符,默认为
\t
。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如hive文件的分隔符\x01
,需要指定为\x01
。-H "column_separator: ,"
-
columns
用于指定文件列和表中列的映射关系,以及各种列转换等。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
-H "columns: k1, k2, tmpk1, k3 = tmpk1 + 1"
-
where
根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
-H "where: k1 > 100"
-
max_filter_ratio
最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。
-H "max_filter_ratio: 0.01"
-
partitions
指定需要导入的分区。
-H "partitions: p1, p2"
-
timeout
指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 14400 秒。
-H "timeout: 120"
-
strict_mode
是否对数据进行严格限制。默认为 false。
-H "strict_mode: true"
-
timezone
指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
-H "timezone: Asia/Shanghai"
-
exec_mem_limit
导入内存限制。默认为 2GB。单位为字节。
-H "exec_mem_limit: 4294967296"
-
format
指定导入数据格式。支持
csv
和json
两种格式。默认为csv
。-H "format: json"
-
jsonpaths
当导入数据格式为 json 时,可以通过 jsonpaths 指定抽取 Json 数据中的字段。
-H "jsonpaths: [\"$.k2\", \"$.k1\"]"
-
strip_outer_array
当导入数据格式为 json 时,
strip_outer_array
为true
表示 Json 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。-H "strip_outer_array: true"
-
json_root
当导入数据格式为 json 时,可以通过
json_root
指定 Json 数据的根节点。PALO 将通过json_root
抽取根节点的元素进行解析。默认为空。-H "json_root: $.RECORDS"
-
merge_type
数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合
delete
参数使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。-H "merge_type: MERGE"
-
delete: 仅在 MERGE 类型下有意义,用于指定 Delete Flag 列以及标示删除标记的条件。
-H "delete: col3 = 1"
-
function_column.sequence_col
仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。
-H "function_column.sequence_col: col3"
-
fuzzy_parse
当导入数据格式是 Json 数组形式,并且数组中每一行的字段顺序完全一致。可以开启此参数,以加速导入速度。通常需配合
strip_outer_array: true
一起使用。详情可见 JSON格式数据导入说明。-H "fuzzy_parse: true"
-
Example
-
导入本地文件 testData,并指定超时时间
curl --location-trusted -u admin -H "label:label1" -H "timeout:100" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入本地文件 testData,并按条件过滤数据
curl --location-trusted -u admin -H "label:label2" -H "where: k1=20180601" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入本地文件 testData,并设置最大容许错误率
curl --location-trusted -u admin -H "label:label3" -H "max_filter_ratio:0.2" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入本地文件 testData,并指定列映射关系
curl --location-trusted -u admin -H "label:label4" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入本地文件 testData,并指定分区,以及最大容许错误率
curl --location-trusted -u admin -H "label:label5" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/example_db/my_table/_stream_load
-
使用streaming方式导入
seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u admin -T - http://host:port/api/example_db/my_table/_stream_load
-
导入含有HLL列的表
curl --location-trusted -u admin -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入含有 BITMAP 列的表
curl --location-trusted -u admin -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入 Json 数据,使用简单模式。即 Json 数据中的字段名即为列名。
表结构为:
category varchar(512) author varchar(512) title varchar(512) price double
Json 数据:
{"category":"C++","author":"avc","title":"C++ primer","price":895}
导入命令:
curl --location-trusted -u admin -H "label:label10" -H "format: json" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入 Json 数据,使用 jsonpath 抽取字段
json数据格式:
{"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},
通过 jsonpath 抽取
category, author, price
三个字段。curl --location-trusted -u admin -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入 Json 数据,指定 Json document 根节点,并展平数组。
json数据格式:
{ "RECORDS":[ {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} ] }
通过 jsonpath 抽取
category, author, price
三个字段。curl --location-trusted -u admin -H "columns: category, price, author" -H "label:label12" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/example_db/my_table/_stream_load
-
使用 DELETE 模式,删除与这批导入 key 相同的数据
curl --location-trusted -u admin -H "merge_type: DELETE" -T testData http://host:port/api/example_db/my_table/_stream_load
-
使用 MERGE 模式。将一批数据中与
flag
列为 true 的数据相匹配的列删除,其他行正常追加curl --location-trusted -u admin -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/example_db/my_table/_stream_load
-
导入数据到含有 Sequence Col 列的 Unique Key 模型表中
curl --location-trusted -u admin -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/example_db/my_table/_stream_load
Keywords
STREAM, LOAD
典型实践
-
查看导入任务状态
Stream Load 是一个同步导入过程,语句执行成功即代表数据导入成功。导入的执行结果会通过 HTTP 返回值同步返回。并以 Json 格式展示。示例如下:
{ "TxnId": 17, "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5", "Status": "Success", "Message": "OK", "NumberTotalRows": 5, "NumberLoadedRows": 5, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 28, "LoadTimeMs": 27, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 0, "WriteDataTimeMs": 3, "CommitAndPublishTimeMs": 18 }
字段释义如下:
- TxnId:导入事务ID,由系统自动生成,全局唯一。
- Label:导入Label,如果没有指定,则系统会生成一个 UUID。
-
Status:
导入结果。有如下取值:
- Success:表示导入成功,并且数据已经可见。
- Publish Timeout:该状态也表示导入已经完成,只是数据可能会延迟可见。
- Label Already Exists:Label 重复,需更换 Label。
- Fail:导入失败。
-
ExistingJobStatus:
已存在的 Label 对应的导入作业的状态。
这个字段只有在当 Status 为 "Label Already Exists" 是才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED" 表示作业成功。
- Message:导入错误信息。
- NumberTotalRows:导入总处理的行数。
- NumberLoadedRows:成功导入的行数。
- NumberFilteredRows:数据质量不合格的行数。
- NumberUnselectedRows:被 where 条件过滤的行数。
- LoadBytes:导入的字节数。
- LoadTimeMs:导入完成时间。单位毫秒。
- BeginTxnTimeMs:向 FE 请求开始一个事务所花费的时间,单位毫秒。
- StreamLoadPutTimeMs:向 FE 请求获取导入数据执行计划所花费的时间,单位毫秒。
- ReadDataTimeMs:读取数据所花费的时间,单位毫秒。
- WriteDataTimeMs:执行写入数据操作所花费的时间,单位毫秒。
- CommitAndPublishTimeMs:向Fe请求提交并且发布事务所花费的时间,单位毫秒。
- ErrorURL:如果有数据质量问题,通过访问这个 URL 查看具体错误行。
-
如何正确提交 Stream Load 作业和处理返回结果。
Stream Load 是同步导入操作,因此用户需同步等待命令的返回结果,并根据返回结果决定下一步处理方式。
用户首要关注的是返回结果中的
Status
字段。如果为
Success
,则一切正常,可以进行之后的其他操作。如果返回结果出现大量的
Publish Timeout
,则可能说明目前集群某些资源(如IO)紧张导致导入的数据无法最终生效。Publish Timeout
状态的导入任务已经成功,无需重试,但此时建议减缓或停止新导入任务的提交,并观察集群负载情况。如果返回结果为
Fail
,则说明导入失败,需根据具体原因查看问题。解决后,可以使用相同的 Label 重试。在某些情况下,用户的 HTTP 连接可能会异常断开导致无法获取最终的返回结果。此时可以使用相同的 Label 重新提交导入任务,重新提交的任务可能有如下结果:
Status
状态为Success
,Fail
或者Publish Timeout
。此时按照正常的流程处理即可。Status
状态为Label Already Exists
。则此时需继续查看ExistingJobStatus
字段。如果该字段值为FINISHED
,则表示这个 Label 对应的导入任务已经成功,无需在重试。如果为RUNNING
,则表示这个 Label 对应的导入任务依然在运行,则此时需每间隔一段时间(如10秒),使用相同的 Label 继续重复提交,直到Status
不为Label Already Exists
,或者ExistingJobStatus
字段值为FINISHED
为止。
-
取消导入任务
已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。
-
Label、导入事务、多表原子性
PALO 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,PALO 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。
-
列映射、衍生列和过滤
PALO 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。
-
错误数据过滤
PALO 的导入任务可以容忍一部分格式错误的数据。容忍率通过
max_filter_ratio
设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,PALO 会自动跳过哪些数据格式不正确的行。关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。
-
严格模式
strict_mode
属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。 -
超时时间
Stream Load 的默认超时时间为 10 分钟。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。
-
数据量和任务数限制
Stream Load 适合导入几个GB以内的数据,因为数据为单线程传输处理,因此导入过大的数据性能得不到保证。当有大量本地数据需要导入时,可以并行提交多个导入任务。
PALO 同时会限制集群内同时运行的导入任务数量,通常在 10-20 个不等。之后提交的导入作业会被拒绝。