Tar文件解压
更新时间:2026-04-23
简介
解压tar文件算子
功能描述
tar文件解压, 支持批量处理
输入
| 输入 | 含义 | 默认值 |
|---|---|---|
| base_path | 支持传入base_path目录, 该目录下有多个子目录, 每个子目录有若干tar文件, 配合discover_datasets和create_tasks_from_datasets方法可便捷的将多个子目录数据集创建为多个task, 并发执行 | 无 |
输出
| 输入 | 含义 | |
|---|---|---|
| output_path | 解压后目录输出 | |
| result | 数据集转换状态:SUCCESS: 解压成功FAILED: 解压失败 |
调用示例
Plain Text
1from __future__ import annotations
2
3import os
4import daft
5from daft import col
6
7from daft.aihc.common.udf import aihc_udf
8from daft.aihc.functions.process.tar_extractor_udf import TarUncompress
9from daft.aihc.functions.process.tar_extractor_udf import discover_datasets
10from daft.aihc.functions.process.tar_extractor_udf import create_tasks_from_datasets
11
12if __name__ == "__main__":
13 if os.getenv("DAFT_RUNNER", "native") == "ray":
14 import ray
15 ray.init(dashboard_host="0.0.0.0", ignore_reinit_error=True)
16 daft.set_runner_ray()
17 daft.set_execution_config(actor_udf_ready_timeout=6000, min_cpu_per_task=0)
18
19 base_path = "/path/dataset/lerobot/"
20 datasets = discover_datasets(base_path, None, None)
21 tasks = create_tasks_from_datasets(datasets)
22 num_datasets = len(datasets)
23
24 ds = daft.from_pydict(tasks)
25 ds = ds.into_partitions(num_datasets)
26
27 ds = ds.with_column(
28 "result",
29 aihc_udf(
30 TarUncompress,
31 construct_args={
32 },
33 num_cpus=1,
34 num_gpus=0,
35 batch_size=1,
36 concurrency=num_datasets,
37 use_process=True,
38 )(col("input_path"), col("output_path")),
39 )
40 ds.show()
评价此篇文章
