在BML平台使用并行文件系统PFS和对象存储BOS
更新时间:2023-07-11
在BML平台使用并行文件系统PFS
平台支持用户在用户资源池上关联并行文件存储PFS作为建模任务时的数据存储,当前支持使用并行文件系统PFS提交的任务:
- 自定义作业-训练作业任务、自动搜索作业任务
前提条件
- 用户在平台上已经挂载了容器引擎CCE资源作为用户资源池,点击了解容器引擎CCE;
- 用户已经创建了并行文件系统PFS,点击了解并行文件系统PFS。
- 并行文件系统PFS能够被容器引擎CCE资源访问到,也即能被对应的VPC访问到。
在用户资源池中挂载PFS实例
- Step1:进入平台管理-资源池管理,已挂载运行正常的用户资源池支持“挂载PFS”的操作项,点击即可选择挂载的PFS实例。
- Step2:完成引入PFS后,支持点击“查看PFS”,并支持卸载操作。
使用PFS作为数据来源提交作业建模任务
在提交作业建模任务时,选择用户资源池后,支持选择该资源池中引入的BOS存储或者PFS存储作为数据来源。
挂载数据和编辑代码时需要注意:平台仅能使用挂载路径下的文件,请确保您的相关文件在此挂载路径之下。
操作及代码示例
前提条件
平台上的用户资源池已经成功挂载PFS。
操作
依次点击 自定义作业 -> 训练作业 -> 新建
选择了用户资源池之后,算法配置信息填写如下:
以代码文件为例: 找到机器中pfs的挂载点,将代码及数据集上传到pfs挂载中,代码文件中只需要填写基于挂载路径的相对路径即可。
demo2.py内容附后, 为官网提供的多节点paddle2.1.1的demo案例,我们只需要修改输出路径为pfs中基于挂载路径的相对路径即可。
代码示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
import gzip
import struct
import numpy as np
from PIL import Image
import time
import paddle
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
import paddle.fluid as fluid
from paddle.io import Dataset
TEST_IMAGE = 't10k-images-idx3-ubyte.gz'
TEST_LABEL = 't10k-labels-idx1-ubyte.gz'
TRAIN_IMAGE = 'train-images-idx3-ubyte.gz'
TRAIN_LABEL = 'train-labels-idx1-ubyte.gz'
class MNIST(Dataset):
"""
MNIST
"""
def __init__(self,
data_dir=None,
mode='train',
transform=None,
backend=None):
assert mode.lower() in ['train', 'test'], "mode should be 'train' or 'test', but got {}".format(mode)
if backend is None:
backend = paddle.vision.get_image_backend()
if backend not in ['pil', 'cv2']:
raise ValueError(
"Expected backend are one of ['pil', 'cv2'], but got {}"
.format(backend))
self.backend = backend
self.mode = mode.lower()
if self.mode == 'train':
self.image_path = os.path.join(data_dir, TRAIN_IMAGE)
self.label_path = os.path.join(data_dir, TRAIN_LABEL)
else:
self.image_path = os.path.join(data_dir, TEST_IMAGE)
self.label_path = os.path.join(data_dir, TEST_LABEL)
self.transform = transform
# read dataset into memory
self._parse_dataset()
self.dtype = paddle.get_default_dtype()
def _parse_dataset(self, buffer_size=100):
self.images = []
self.labels = []
with gzip.GzipFile(self.image_path, 'rb') as image_file:
img_buf = image_file.read()
with gzip.GzipFile(self.label_path, 'rb') as label_file:
lab_buf = label_file.read()
step_label = 0
offset_img = 0
# read from Big-endian
# get file info from magic byte
# image file : 16B
magic_byte_img = '>IIII'
magic_img, image_num, rows, cols = struct.unpack_from(
magic_byte_img, img_buf, offset_img)
offset_img += struct.calcsize(magic_byte_img)
offset_lab = 0
# label file : 8B
magic_byte_lab = '>II'
magic_lab, label_num = struct.unpack_from(magic_byte_lab,
lab_buf, offset_lab)
offset_lab += struct.calcsize(magic_byte_lab)
while True:
if step_label >= label_num:
break
fmt_label = '>' + str(buffer_size) + 'B'
labels = struct.unpack_from(fmt_label, lab_buf, offset_lab)
offset_lab += struct.calcsize(fmt_label)
step_label += buffer_size
fmt_images = '>' + str(buffer_size * rows * cols) + 'B'
images_temp = struct.unpack_from(fmt_images, img_buf,
offset_img)
images = np.reshape(images_temp, (buffer_size, rows *
cols)).astype('float32')
offset_img += struct.calcsize(fmt_images)
for i in range(buffer_size):
self.images.append(images[i, :])
self.labels.append(
np.array([labels[i]]).astype('int64'))
def __getitem__(self, idx):
image, label = self.images[idx], self.labels[idx]
image = np.reshape(image, [28, 28])
if self.backend == 'pil':
image = Image.fromarray(image.astype('uint8'), mode='L')
if self.transform is not None:
image = self.transform(image)
if self.backend == 'pil':
return image, label.astype('int64')
return image.astype(self.dtype), label.astype('int64')
def __len__(self):
return len(self.labels)
def mlp_model():
"""
mlp_model
"""
x = paddle.static.data(name="x", shape=[64, 28, 28], dtype='float32')
y = paddle.static.data(name="y", shape=[64, 1], dtype='int64')
x_flatten = paddle.reshape(x, [64, 784])
fc_1 = nn.fc(x=x_flatten, size=128, activation='tanh')
fc_2 = nn.fc(x=fc_1, size=128, activation='tanh')
prediction = nn.fc(x=[fc_2], size=10, activation='softmax')
cost = paddle.fluid.layers.cross_entropy(input=prediction, label=y)
acc_top1 = paddle.metric.accuracy(input=prediction, label=y, k=1)
avg_cost = paddle.mean(x=cost)
res = [x, y, prediction, avg_cost, acc_top1]
return res
def train(epoch, exe, train_dataloader, cost, acc):
"""
train
"""
total_time = 0
step = 0
for data in train_dataloader():
step += 1
start_time = time.time()
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
if step % 100 == 0:
end_time = time.time()
total_time += (end_time - start_time)
print(
"epoch: %d, step:%d, train_loss: %f, train_acc: %f, total time cost = %f, speed: %f"
% (epoch, step, loss_val[0], acc_val[0], total_time,
1 / (end_time - start_time) ))
def test(exe, test_dataloader, cost, acc):
"""
test
"""
total_time = 0
step = 0
for data in test_dataloader():
step += 1
start_time = time.time()
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
if step % 100 == 0:
end_time = time.time()
total_time += (end_time - start_time)
print(
"step:%d, test_loss: %f, test_acc: %f, total time cost = %f, speed: %f"
% (step, loss_val[0], acc_val[0], total_time,
1 / (end_time - start_time) ))
def save(save_dir, feed_vars, fetch_vars, exe):
"""
save
"""
path_prefix = os.path.join(save_dir, 'model')
if fleet.is_first_worker():
paddle.static.save_inference_model(path_prefix, feed_vars, fetch_vars, exe)
if __name__ == '__main__':
# 设置训练集路径
train_data = './job_model/paddle2.1.1/train_data'
# 设置验证集路径
test_data = './job_model/paddle2.1.1/train_data'
# 设置输出路径
save_dir = './job_model/paddle2.1.1/output'
# 设置迭代轮数
epochs = 10
# 设置验证间隔轮数
test_interval = 2
# 设置模型保存间隔轮数
save_interval = 2
paddle.enable_static()
paddle.vision.set_image_backend('cv2')
# 训练数据集
train_dataset = MNIST(data_dir=train_data, mode='train')
# 验证数据集
test_dataset = MNIST(data_dir=test_data, mode='test')
# 设置模型
[x, y, pred, cost, acc] = mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
# 数据加载
train_dataloader = paddle.io.DataLoader(
train_dataset, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, shuffle=True, return_list=False)
test_dataloader = paddle.io.DataLoader(
test_dataset, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, return_list=False)
# fleet初始化
strategy = fleet.DistributedStrategy()
fleet.init(is_collective=True, strategy=strategy)
# 设置优化器
optimizer = paddle.optimizer.Adam()
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
prog = paddle.static.default_main_program()
for epoch in range(epochs):
train(epoch, exe, train_dataloader, cost, acc)
if epoch % test_interval == 0:
test(exe, test_dataloader, cost, acc)
# save model
if epoch % save_interval == 0:
save(save_dir, [x], [pred], exe)
资源配置 (依据代码情况填写即可,上文中的案例用到了多节点和GPU资源)