使用自定义任务处理数据
更新时间:2025-09-10
在百舸平台中用户可以利用已有资源池队列或创建专门的CPU资源池,使用分布式训练任务模块提交自定义任务方式处理数据,并将处理好的数据写入到BOS、PFS、CFS等存储中供后续业务使用。
此文档提供使用百舸自定义任务处理数据的最佳实践方法和思路,针对不同业务场景,用户可参考该文档实现更加复杂的数据处理任务,亦可以创建可以并发处理的分布式处理任务。
典型场景
- 格式转换
- 数据清洗
- 数据生成
以下以将json格式的数据转换为jsonl格式为例讲解如何进行处理。
假设有批量的json格式的数据文件需要批量转换为jsonl格式,将存储在BOS的test-a存储桶的/json目录中的所有json文件批量转换为同名的jsonl格式并保存在PFS的/jsonl目录。
原始数据格式:
JSON
1[
2{"src": "www.nvidia.com", "text": "The quick brown fox", "type": "Eng", "id": "0", "title": "First Part"},
3{"src": "The Internet", "text": "jumps over the lazy dog", "type": "Eng", "id": "42", "title": "Second Part"}
4]
目标数据格式:
JSON
1{"src": "www.nvidia.com", "text": "The quick brown fox", "type": "Eng", "id": "0", "title": "First Part"}
2{"src": "The Internet", "text": "jumps over the lazy dog", "type": "Eng", "id": "42", "title": "Second Part"}
编写数据处理脚本
首先,需要编写数据处理脚本,支持使用任意编程语言编写转换脚本,我们以python编写为例。
创建json2jsonl.py、requirements.txt两个文件并保存在同一目录下。
json2jsonl.py文件:
Python
1#!/usr/bin/env python3
2"""
3JSON到JSONL批量转换脚本
4将JSON数组文件转换为JSONL格式(每行一个JSON对象)
5"""
6
7import argparse
8import json
9import sys
10from pathlib import Path
11
12
13def convert_json_to_jsonl(input_file: Path, output_file: Path) -> bool:
14 """转换单个JSON文件为JSONL格式"""
15 try:
16 with open(input_file, 'r', encoding='utf-8') as f:
17 data = json.load(f)
18
19 if not isinstance(data, list):
20 print(f"错误: {input_file.name} 不是数组格式")
21 return False
22
23 output_file.parent.mkdir(parents=True, exist_ok=True)
24
25 with open(output_file, 'w', encoding='utf-8') as f:
26 for item in data:
27 if isinstance(item, dict):
28 f.write(json.dumps(item, ensure_ascii=False) + '\n')
29
30 print(f"转换成功: {input_file.name} -> {output_file.name}")
31 return True
32
33 except Exception as e:
34 print(f"转换失败 {input_file.name}: {e}")
35 return False
36
37
38def main():
39 parser = argparse.ArgumentParser(description="JSON到JSONL批量转换")
40 parser.add_argument('--input-dir', '-i', required=True, help='输入目录')
41 parser.add_argument('--output-dir', '-o', required=True, help='输出目录')
42 parser.add_argument('--verbose', '-v', action='store_true', help='详细日志')
43 args = parser.parse_args()
44
45 input_path = Path(args.input_dir)
46 output_path = Path(args.output_dir)
47
48 if not input_path.exists():
49 print(f"错误: 输入目录不存在 {input_path}")
50 sys.exit(1)
51
52 json_files = list(input_path.rglob("*.json"))
53 if not json_files:
54 print(f"警告: 在 {input_path} 中未找到JSON文件")
55 return
56
57 print(f"找到 {len(json_files)} 个JSON文件")
58
59 success = 0
60 for json_file in json_files:
61 relative_path = json_file.relative_to(input_path)
62 output_file = output_path / relative_path.with_suffix('.jsonl')
63
64 if convert_json_to_jsonl(json_file, output_file):
65 success += 1
66
67 print(f"转换完成: {success}/{len(json_files)} 个文件成功")
68
69
70if __name__ == "__main__":
71 main()
requirements.txt文件:
Txt
1ujson>=5.0.0
2orjson>=3.0.0
此示例中没有外部依赖库,requirements.txt中的依赖仅用于演示
编写启动命令
Bash
1#!/bin/bash
2
3# JSON到JSONL批量转换脚本启动器
4set -e
5
6cd /mnt/scripts/
7
8# 默认值
9INPUT_DIR="/mnt/data/json"
10OUTPUT_DIR="/mnt/data/jsonl"
11VERBOSE="${VERBOSE:-false}"
12
13# 解析参数
14while [[ $# -gt 0 ]]; do
15 case $1 in
16 -h|--help)
17 echo "用法: $0 [-i INPUT_DIR] [-o OUTPUT_DIR] [-v]"
18 echo "环境变量: INPUT_DIR, OUTPUT_DIR, VERBOSE"
19 exit 0
20 ;;
21 -v|--verbose) VERBOSE="true"; shift ;;
22 -i|--input-dir) INPUT_DIR="$2"; shift 2 ;;
23 -o|--output-dir) OUTPUT_DIR="$2"; shift 2 ;;
24 *) echo "未知参数: $1"; exit 1 ;;
25 esac
26done
27
28# 安装依赖
29pip install -r requirements.txt
30
31# 检查Python
32if ! command -v python3 &> /dev/null; then
33 echo "错误: Python3 未安装"
34 exit 1
35fi
36
37# 检查输入目录
38if [ ! -d "$INPUT_DIR" ]; then
39 echo "错误: 输入目录不存在: $INPUT_DIR"
40 exit 1
41fi
42
43# 创建输出目录
44mkdir -p "$OUTPUT_DIR"
45
46# 构建命令
47CMD="python3 json2jsonl.py --input-dir \"$INPUT_DIR\" --output-dir \"$OUTPUT_DIR\""
48[ "$VERBOSE" = "true" ] && CMD="$CMD --verbose"
49
50# 执行转换
51echo "转换: $INPUT_DIR -> $OUTPUT_DIR"
52eval $CMD
创建自定义任务
- 进入百舸创建任务页面填写任务信息,选择镜像、填写启动命令
- 上传数据处理脚本代码到test-a存储桶的/scripts中
- 挂载BOS和PFS存储,选择源数据存储和目标存储,分别设置挂载路径为/mnt/data/json和/mnt/data/jsonl(与启动脚本中的目录保持一致),同时挂载代码目录,并设置挂载路径为/mnt/scripts(与脚本中使用的路径保持一致)
- 提交任务,等待数据处理完成,在任务详情中可以通过事件、日志查看任务运行情况
高级拓展
- 业务中常用的周期执行数据处理任务可以使用百舸OpenAPI进行任务提交,也可以集成到自有系统中进行业务调用
- 常用的处理工具打包成专门的数据处理镜像,可以避免每次上传处理脚本,直接在启动命令中调用
- 程序处理脚本程序也可以在启动命令中使用git命令从代码库中拉取或者wget从远程服务器下载
- 可以通过使用环境变量传递密钥、路径等信息进一步保证敏感信息安全