Object的分块上传
更新时间:2025-04-01
分块上传的场景
除了通过putObject()方法上传文件到BOS以外,BOS还提供了另外一种上传模式:分块上传(Multipart Upload)。用户可以在如下的应用场景内(但不仅限于此),使用分块上传模式,如:
- 需要支持断点上传。
- 上传超过5GB大小的文件。
- 网络条件较差,和BOS的服务器之间的连接经常断开。
- 需要流式地上传文件。
- 上传文件之前,无法确定上传文件的大小。
Multipart Upload分块上传流程
假设有一个文件,本地路径为/path/to/file.zip
,由于文件比较大,使用分块上传其传输到BOS中。 基本流程:
- 初始化Multipart Upload。
- 上传分块。
- 完成分块上传。
初始化Multipart Upload
使用initiateMultipartUpload
方法来初始化一个分块上传事件:
let bucketName = "test-harmony-bucket"
let objectName = "test-multi-upload-object"
let args = new InitiateMultipartUploadArgs();
// 设置上传的object为标准存储类型
args.storageClass = "STANDARD";
// initialization
let initMultiUploadResult: InitiateMultipartUploadResult;
try {
// contentType参数设置为undefined占位, 使用默认值
initMultiUploadResult = await bosClient.initiateMultipartUpload(bucketName, objectName, undefined, args);
logger.info(`init multi upload success, info: ${JSON.stringify(initMultiUploadResult)}`)
} catch(bosResponse) {
logger.error(`errCode: ${bosResponse.error.code}`)
logger.error(`requestId: ${bosResponse.error.requestId}`)
logger.error(`errMessage: ${bosResponse.error.message}`)
logger.error(`statusCode: ${bosResponse.statusCode}`)
}
说明:
initiateMultipartUpload
的返回结果中含有UploadId
,它是区分分块上传事件的唯一标识,在后面的操作中,我们将用到它。
上传分块
let stat = fs.lstatSync("/path/to/file.zip");
let file = fs.openSync(filePath, fs.OpenMode.READ_ONLY);
if (stat.size < MAX_SINGLE_OBJECT_SIZE) { //BOS最大文件大小为48.8TB
return Promise.reject(fillBceError(`The file size exceeds ${MAX_SINGLE_OBJECT_SIZE}`));
}
// 如果文件小于一个块的大小,就不需要使用分块上传
if (stat.size < MIN_MULTIPART_SIZE || this._clientImpl.clientOptions.multiPartSize < MIN_MULTIPART_SIZE) {
return Promise.reject(fillBceError(`multipart size should not be less than ${MIN_MULTIPART_SIZE}`));
}
// 计算每块的大小的尺寸
let partSize = (this._clientImpl.clientOptions.multiPartSize + MULTIPART_ALIGN - 1) / MULTIPART_ALIGN * MULTIPART_ALIGN;
let partNum = Math.floor((stat.size + partSize - 1) / partSize);
// 如果块的数量超过10000,将重新计算每块的大小,不再使用默认的块大小
if (partNum > MAX_PART_NUMBER) {
partSize = (stat.size + MAX_PART_NUMBER - 1) / MAX_PART_NUMBER;
partSize = (partSize + MULTIPART_ALIGN - 1) / MULTIPART_ALIGN * MULTIPART_ALIGN;
partNum = Math.floor((stat.size + partSize - 1) / partSize);
}
logger.debug(`start to upload super file, total parts: ${partNum}, part size: ${partSize}`);
let args = new InitiateMultipartUploadArgs();
if (storageClass && isValidStorageClass(storageClass)) {
args.storageClass = storageClass;
}
// 使用初始化分块上传的结果返回的uploadId
let uploadId = initMultiUploadResult.uploadId as string;
// group task by partNum
let completeArgs = new CompleteMultipartUploadArgs();
completeArgs.partInfo = new PartInfo();
completeArgs.partInfo.parts = [];
let partIndex = 1;
// 依次上传每一块
while (partIndex <= partNum) {
logger.debug(`upload part: ${partIndex}`);
let uploadSize = partSize;
let offset = (partIndex - 1) * partSize;
if (uploadSize > stat.size - offset) {
uploadSize = stat.size - offset;
}
let data = new ArrayBuffer(uploadSize);
fs.readSync(file.fd, data, {offset: offset, length: uploadSize});
try {
let etag = await bosClient.uploadPart(bucketName, objectName, uploadId, partIndex, data);
// 每块上传成功后都需要记录eTag信息和partNumber信息
let uploadInfo = new UploadInfoType();
uploadInfo.partNumber = partIndex;
uploadInfo.eTag = etag;
completeArgs.partInfo.parts.push(uploadInfo);
} catch (bosResponse) { // 如果上传过程有失败的情况
logger.error(`upload error, info: ${JSON.stringify(bosResponse.error)}`)
await bosClient.abortMultipartUpload(bucketName, objectName, uploadId); //放弃本次分块上传
fs.closeSync(file);
return Promise.reject(bosResponse);
}
partIndex++;
}
fs.closeSync(file.fd); // 关闭本地文件
注意:
- UploadPart 方法要求Part大小是1MB的整数倍或大于5MB。但是Upload Part接口并不会立即校验上传Part的大小;只有当Complete Multipart Upload的时候才会校验。
- 为了保证数据在网络传输过程中不出现错误,建议您在UploadPart后,使用每个分块BOS返回的Content-MD5值分别验证已上传分块数据的正确性。当所有分块数据合成一个Object后,不再含MD5值。
- Part号码的范围是1~10000。如果超出这个范围,BOS将返回InvalidArgument的错误码。
- 每次上传Part之后,BOS的返回结果会包含一个 PartETag 对象,它是上传块的ETag与块编号(PartNumber)的组合,在后续完成分块上传的步骤中会用到它,因此需要将其保存起来。一般来讲这些 PartETag 对象将被保存到List中。
完成分块上传
let completeMultiUploadResult:CompleteMultipartUploadResult;
try {
completeMultiUploadResult = await bosClient.completeMultipartUpload(bucketName, objectName, uploadId, completeArgs);
} catch(bosResponse) {
logger.error(`complete multipart fail, info: ${JSON.stringify(bosResponse.error)}`)
await bosClient.abortMultipartUpload(bucketName, objectName, uploadId);
}
完整示例
该部分已经封装到BosClient.uploadSuperFile方法中,用户可以直接调用即可:
import { logger, Credential, BosClient, ClientOptions } from "bos"
import { CompleteMultipartUploadResult } from "bos/src/main/ets/bos/api/DataType"
let credential = new Credential(AccessKeyID, SecretAccessKey, Token); //STS返回的临时AK/SK及Token
let clientOptions = new ClientOptions();
clientOptions.endpoint = "bj.bcebos.com"; //传入Bucket所在区域域名
let bosClient = new BosClient(credential, clientOptions); // 创建BosClient
let bucketName = "test-harmony-bucket"
let objectName = "multi_upload_object.txt"
let path = "/path/to/file.zip"
let result: CompleteMultipartUploadResult;
try {
result = await bosClient.uploadSuperFile(bucketName, objectName, path);
logger.info(`upload super file success, result : ${JSON.stringify(result)}`);
} catch (bosResponse) {
logger.error(`errCode: ${bosResponse.error.code}`)
logger.error(`requestId: ${bosResponse.error.requestId}`)
logger.error(`errMessage: ${bosResponse.error.message}`)
logger.error(`statusCode: ${bosResponse.statusCode}`)
}
取消分块上传
用户可以使用abortMultipartUpload方法取消分块上传。
try {
await bosClient.abortMultipartUpload(bucketName, objectName, uploadId);
} catch(bosResponse) {
logger.error(`errCode: ${bosResponse.error.code}`)
logger.error(`requestId: ${bosResponse.error.requestId}`)
logger.error(`errMessage: ${bosResponse.error.message}`)
logger.error(`statusCode: ${bosResponse.statusCode}`)
}
获取未完成的分块上传
用户可以使用listMultipartUploads
方法获取Bucket内未完成的分块上传事件。
示例代码
try {
let listMultipartUploadsResult = await bosClient.listMultipartUploads(bucketName);
for (let multiUpload of listMultipartUploadsResult.uploads as ListMultipartUploadsType[]) {
logger.info(`key: ${multiUpload.key}, uploadId: ${multiUpload.uploadId}`);
}
} catch(bosResponse) {
logger.error(`errCode: ${bosResponse.error.code}`)
logger.error(`requestId: ${bosResponse.error.requestId}`)
logger.error(`errMessage: ${bosResponse.error.message}`)
logger.error(`statusCode: ${bosResponse.statusCode}`)
}
注意:
- 默认情况下,如果Bucket中的分块上传事件的数目大于1000,则只会返回1000个Object,并且返回结果中IsTruncated的值为True,同时返回NextKeyMarker作为下次读取的起点。
- 若想获取更多分块上传事件,可以访问ListMultipartUploadsResult实例中的keyMarker参数分次读取。
获取所有已上传的分块信息
用户可以使用listParts
方法获取某个上传事件中所有已上传的块。
示例代码
try {
let listPartsResult = await bosClient.listParts(bucketName, objectName, "e8583fb592ec699d7ecca085fc46fdd8");
for (let part of listPartsResult.parts as ListPartType[]) {
logger.info(`partNumber: ${part.partNumber}, eTag: ${part.eTag}`);
}
} catch(bosResponse) {
logger.error(`errCode: ${bosResponse.error.code}`)
logger.error(`requestId: ${bosResponse.error.requestId}`)
logger.error(`errMessage: ${bosResponse.error.message}`)
logger.error(`statusCode: ${bosResponse.statusCode}`)
}
注意:
- 默认情况下,如果Bucket中的分块上传事件的数目大于1000,则只会返回1000个Object,并且返回结果中IsTruncated的值为True,同时返回NextPartNumberMarker作为下次读取的起点。
- 若想获取更多已上传的分块信息,可以访问ListPartsResult中的partNumberMarker参数分次读取。