logo
10

如何使用千帆 Python SDK 进行数据清洗

如何在 SDK 中进行数据清洗

千帆 Python SDK 内集成了数据处理的能力。用户既可以在本地对数据集进行各类数据处理操作,也可以选择在千帆平台上发起一个数据处理的任务,使用平台的算力以及提供的功能,对数据集进行处理。
本篇教程将会介绍,如何使用千帆 Python SDK 发起千帆平台的数据处理任务

前置准备

在开始之前,用户首先需要安装千帆 Python SDK,使用的版本应该大于等于 0.2.8
  
  
  
  
  
  
#-# cell_skip
! pip install -U "qianfan>=0.2.8"
并且设置好 Access Key 与 Secret Key
  
  
  
  
  
  
import os
os.environ['QIANFAN_ACCESS_KEY'] = 'your_access_key'
os.environ['QIANFAN_SECRET_KEY'] = 'your_secret_key'
os.environ['QIANFAN_CONSOLE_API_RETRY_COUNT'] = "10"
os.environ['QIANFAN_CONSOLE_API_RETRY_TIMEOUT'] = "60"

创建数据集

首先,我们需要准备一个符合千帆平台泛文本格式的数据集,因为千帆平台的数据处理任务只支持这种数据格式。
本篇教程在 data_file 文件夹下提供了一个名叫 generic_example_dataset.zip 的压缩包文件,用作本篇教程的数据集内容
  
  
  
  
  
  
from zipfile import ZipFile
dataset_folder_name = "generic_example_dataset"
dataset_zip_file_path = "data_file/generic_example_dataset.zip"
if not os.path.exists(dataset_folder_name):
with ZipFile(dataset_zip_file_path) as zip_f:
zip_f.extractall(dataset_folder_name)
然后我们将解压后得到的数据集文件夹导入成为千帆 Python SDK 的 Dataset 对象
  
  
  
  
  
  
from qianfan.dataset import Dataset
ds = Dataset.load(data_file=dataset_folder_name)
print(ds.list(0))
print(ds.row_number())

上传数据集

在我们创建好 Dataset 对象后,接下来我们需要将这个数据集上传到千帆平台,以进行后续在平台上的数据处理工作
在运行前,需要用户首先开通一个位于北京的 BOS 对象存储服务,并且将变量 bos_bucket_namebos_file_path 的值分别替换为 BOS 的 Bucket 名字与你的预期存储路径
  
  
  
  
  
  
from qianfan.dataset import (
DataStorageType,
DataTemplateType,
)
from qianfan.dataset.data_source import QianfanDataSource
from qianfan.utils.utils import generate_letter_num_random_id
bos_bucket_name = "your_bucket_name"
bos_file_path = "/test/"
qianfan_data_source = QianfanDataSource.create_bare_dataset(
f"etl_dataset_{generate_letter_num_random_id(len=8)}",
DataTemplateType.GenericText,
DataStorageType.PrivateBos,
bos_bucket_name,
bos_file_path,
)
ds = ds.save(qianfan_data_source)

创建数据处理任务

在我们上传完毕数据集之后,我们就可以在平台上创建数据清洗任务了。
具体方法是,调用与对应千帆数据集绑定的 Dataset 对象的 online_data_process 方法,并向其中传入传入包含了 QianfanOperator 对象的列表,其中 QianfanOperator 对象中包含了数据处理任务的具体配置
用户可以查看 qianfan.dataset.data_operator 下所包含的所有可用于在线处理的配置算子,并且自由组合。下面演示了如何将这些算子组合在一起并且提交任务
  
  
  
  
  
  
#-# cell_skip
from qianfan.dataset.qianfan_data_operators import (
RemoveEmoji, # 移除文本中的 Emoji
RemoveInvisibleCharacter, # 移除文本中的不可见字符
RemoveNonMeaningCharacters, # 移除文本中的乱码
RemoveWebIdentifiers, # 移除文本中的网页标识符
ReplaceTraditionalChineseToSimplified, # 把繁体中文转换成简体中文
ReplaceUniformWhitespace, # 规范化空格
FilterCheckSpecialCharacters, # 检查文档的特殊字符率
FilterCheckWordRepetitionRemoval, # 检查文档的词重复率
DeduplicationSimhash, # 根据海明距离进行文档去重
ReplaceEmails, # 去除电子邮件地址
ReplaceIdentifier, # 去除各种数字和字符标识符
ReplaceIp, # 去除 IP 地址
)
# 算子的具体含义请见 https://cloud.baidu.com/doc/WENXINWORKSHOP/s/8lp6irqen
result = ds.online_data_process([
RemoveEmoji(),
RemoveInvisibleCharacter(),
RemoveNonMeaningCharacters(),
RemoveWebIdentifiers(),
ReplaceTraditionalChineseToSimplified(),
ReplaceUniformWhitespace(),
FilterCheckSpecialCharacters(special_characters_max_cutoff=0.3),
FilterCheckWordRepetitionRemoval(word_repetition_max_cutoff=0.2),
DeduplicationSimhash(distance=5),
ReplaceEmails(),
ReplaceIdentifier(),
ReplaceIp(),
])
print(result)

查看任务结果

在数据清理任务完成之后,用户可以使用返回的字典中的 new_dataset_id 字段值,来从千帆获取到经过数据清洗后的数据集
  
  
  
  
  
  
#-# cell_skip
processed_dataset_id = result["new_dataset_id"]
processed_ds = Dataset.load(qianfan_dataset_id=processed_dataset_id)
print(processed_ds.list(0))
print(processed_ds.row_number())
评论
用户头像