上传文件
在BOS中,用户操作的基本数据单元是Object。Bucket中的Object数量不限,但单个Object最大允许存储5TB的数据。Object包含Key、Meta和Data。其中,Key是Object的名字;Meta是用户对该Object的描述,由一系列Name-Value对组成;Data是Object的数据。
BOS Python SDK提供了丰富的文件上传接口,可以通过以下方式上传文件:
- 简单上传
- 追加上传
- 分片上传
- 断点续传上传
- 获取上传进度
Object的命名规范如下:
- 使用UTF-8编码。
- 长度必须在1-1023字节之间。
- 首字母不能为'/',不能包含'@'字符,'@'用于图片处理接口。
简单上传
BOS在简单上传的场景中,支持以指定文件形式、以数据流方式、以字符串方式执行Object上传,请参考如下代码:
-
如下代码可以进行Object上传:
data = open(file_name, 'rb') #以数据流形式上传Object,用户需要自行计算数据长度content_length #用户需自行计算content_md5.计算方法是对数据执行md5算法获取128位二进制数据,再进行base64编码 bos_client.put_object(bucket_name, object_key, data, content_length,content_md5) #从字符串中上传的Object bos_client.put_object_from_string(bucket_name, object_key, string) #从文件中上传的Object bos_client.put_object_from_file(bucket_name, object_key, file_name)
其中,data为流对象,不同类型的Object采用不同的处理方法,从字符串中的上传使用StringIO的返回,从文件中的上传使用open()的返回,因此BOS提供了封装好的接口方便用户进行快速上传。
Object以文件的形式上传到BOS中,put_object相关接口均支持不超过5GB的Object上传。在put_object、put_object_from_string或者put_object_from_file请求处理成功后,BOS会在Header中返回Object的ETag作为文件标识。
这些接口均有可选参数:
参数 | 说明 |
---|---|
content_type | 上传文件或字符串的类型 |
content_md5 | 文件数据校验,设置后BOS会启用文件内容MD5校验,把您提供的MD5与文件的MD5比较,不一致会抛出错误 |
content_length | 定义文件长度,put_object_from_string()不包含该参数 |
content_sha256 | 用于进行文件校验 |
user_metadata | 用户自定义元数据 |
storage_class | 设置文件存储类型 |
user_headers | 用户定义header |
content_md5计算方法是对数据执行md5算法获取128位二进制数据,再进行base64编码。示例如下:
import io
import hashlib
import base64
file_name = "your_file"
buf_size = 8192
with open(file_name, 'rb') as fp:
md5 = hashlib.md5()
while True:
bytes_to_read = buf_size
buf = fp.read(bytes_to_read)
if not buf:
break
md5.update(buf)
content_md5 = base64.standard_b64encode(md5.digest())
设置文件元信息
文件元信息(Object Meta),是对用户在向BOS上传文件时,同时对文件进行的属性描述,主要分为分为两种:设置HTTP标准属性(HTTP Headers)和用户自定义的元信息。
设定Object的Http Header BOS Python SDK本质上是调用后台的HTTP接口,因此用户可以在上传文件时自定义Object的Http Header。常用的http header说明如下:
名称 | 描述 | 默认值 |
---|---|---|
Cache-Control | 指定该Object被下载时的网页的缓存行为 | 无 |
Content-Encoding | 表示消息主体进行了何种方式的内容编码转换 | 无 |
Content-Disposition | 指示MINME用户代理如何显示附加的文件,打开或下载,及文件名称 | 无 |
Expires | 缓存过期时间 | 无 |
参考代码如下:
-
从字符串中上传带有特定header的object
user_headers = {"header_key":"header_value"} #从字符串中上传带有特定header的object bos_client.put_object_from_string(bucket=bucket_name, key=object_key, data=string, user_headers=user_headers) #从文件中上传带有特定header的object bos_client.put_object_from_file(bucket=bucket_name, key=object_key, file_name=file, user_headers=user_headers)
用户自定义元信息
BOS支持用户自定义元数据来对Object进行描述。如下代码所示:
#用户自定义元数据
user_metadata = {"name":"my-data"}
#从字符串中上传带有用户自定义meta的object
bos_client.put_object_from_string(bucket=bucket_name,
key=object_key,
data=string,
user_metadata=user_metadata)
#从文件中上传带有用户自定义meta的object
bos_client.put_object_from_file(bucket=bucket_name,
key=object_key,
file_name=file,
user_metadata=user_metadata)
提示:
- 在上面代码中,用户自定义了一个名字为”name”,值为”my-data”的元数据
- 当用户下载此Object的时候,此元数据也可以一并得到
- 一个Object可以有多个类似的参数,但所有的User Meta总大小不能超过2KB
设置Object的Copy属性
BOS同时会提供copy_object接口用于将一个已经存在的Object拷贝到另外一个Object,拷贝过程中会对源Object的Etag或修改状态进行判断,根据判断结果决定是否执行拷贝。详细的参数解释如下:
名称 | 类型 | 描述 | 是否必需 |
---|---|---|---|
x-bce-copy-source-if-match | String | 如果源Object的ETag值和用户提供的ETag相等,则执行拷贝操作,否则拷贝失败。 | 否 |
x-bce-copy-source-if-none-match | String | 如果源Object的ETag和用户提供的ETag不相等,则执行拷贝操作,否则拷贝失败。 | 否 |
x-bce-copy-source-if-unmodified-since | String | 如果源object在x-bce-copy-source-if-unmodified-since之后没被修改,则执行拷贝操作,否则拷贝失败。 | 否 |
x-bce-copy-source-if-modified-since | String | 如果源object在x-bce-copy-source-if-modified-since之后被修改了,则执行拷贝操作,否则拷贝失败。 | 否 |
对应的示例代码:
copy_object_user_headers = {"copy_header_key":"copy_header_value"}
bos_client.copy_object(source_bucket_name = bucket_name,
source_key = object_name,
target_bucket_name = bucket_name,
target_key = object_name,
user_metadata = user_metadata,
user_headers = user_headers,
copy_object_user_headers = copy_object_user_headers)
上传Object时设置存储类型
BOS支持标准存储, 低频存储,冷存储和归档存储,上传Object并存储为某种存储类型时通过指定StorageClass实现,默认为标准存储,四种存储类型对应的参数如下:
存储类型 | 参数 |
---|---|
标准存储 | STANDARD |
低频存储 | STANDARD_IA |
冷存储 | COLD |
归档存储 | ARCHIVE |
以低频存储和归档存储为例,代码如下:
from baidubce.services.bos import storage_class
#从文件中上传冷存储类型的Object
bos_client.put_object_from_file(bucket=bucket_name,
key=object_key,
file_name=file,
storage_class=storage_class.COLD)
#从字符串上传冷存储类型的Object
bos_client.put_object_from_string(bucket=bucket_name,
key=object_key,
data=string,
storage_class=storage_class.COLD)
#从文件中上传归档存储类型的Object
bos_client.put_object_from_file(bucket=bucket_name,
key=object_key,
file_name=file,
storage_class=storage_class.ARCHIVE)
追加上传
上文介绍的简单上传方式,创建的Object都是Normal类型,用户不可再进行追加写,这在日志、视频监控、视频直播等数据复写较频繁的场景中使用不方便。
正因如此,百度智能云BOS支持AppendObject,即以追加写的方式上传文件。通过AppendObject操作创建的Object类型为Appendable Object,可以对该Object追加数据。AppendObject大小限制为0~5G。归档存储类型不支持追加上传。
通过AppendObject方式上传示例代码如下:
#上传appendable object。其中“content_md5(data)”表示需要用户自行计算上传数据的md5值。
#content_md5计算方法是对数据执行md5算法获取128位二进制数据,再进行base64编码。示例见上文“简单上传”部分。
#其中“content_length(data)”表示需要用户自行计算上传数据的长度
response = bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=data,
content_md5=content_md5(data), content_length=content_length(data))
#获取下次追加写的位置
next_offset = response.metadata.bce_next_append_offset
bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=next_data,
content_md5=content_md5(next_data), content_length=content_length(next_data),
offset=next_offset)
#从字符串上传appendable object
from baidubce.services.bos import storage_class
bos_client.append_object_from_string(bucket_name=bucket_name,
key=object_key,
data=string,
offset=offset,
storage_class=storage_class.STANDARD,
user_headers=user_headers)
分块上传
除了通过putObject接口上传文件到BOS以外,BOS还提供了另外一种上传模式 —— Multipart Upload。用户可以在如下的应用场景内(但不仅限于此),使用Multipart Upload上传模式,如:
- 需要支持断点上传。
- 上传超过5GB大小的文件。
- 网络条件较差,和BOS的服务器之间的连接经常断开。
- 需要流式地上传文件。
- 上传文件之前,无法确定上传文件的大小。
下面将介绍分步实现Multipart Upload。
初始化Multipart Upload
BOS使用initiate_multipart_upload方法来初始化一个分块上传事件:
upload_id = bos_client.initiate_multipart_upload(bucket_name, object_key).upload_id
该方法会返回InitMultipartUploadResponse对象,此对象中包含uploadId参数,用来表示此次的上传事件。
带有特定header的分块上传的初始化
bos_client.initiate_multipart_upload(bucket_name=bucket,
key=object_key,
user_headers=user_headers)
其中,header可设置的属性有:"Cache-Control"、"Content-Encoding"、"Content-Disposition"、"Expires",get-object和get-object-meta两个接口会返回设置的这四个header。
低频、冷存储、归档存储存储分块上传的初始化
低频存储分块上传的初始化需要指定storage_class
,请参考以下代码(冷存储以此类推):
from baidubce.services.bos import storage_class
bos_client.initiate_multipart_upload(bucket_name=bucket,
key=object_key,
storage_class = storage_class.STANDARD_IA)
上传分块
初始化完成后,进行分块上传:
left_size = os.path.getsize(file_name)
# left_size用于设置分块开始位置
# 设置分块的开始偏移位置
offset = 0
part_number = 1
part_list = []
while left_size > 0:
# 设置每块为5MB
part_size = 5 * 1024 * 1024
if left_size < part_size:
part_size = left_size
response = bos_client.upload_part_from_file(
bucket_name, object_key, upload_id, part_number, part_size, file_name, offset)
left_size -= part_size
offset += part_size
part_list.append({
"partNumber": part_number,
"eTag": response.metadata.etag
})
part_number += 1
注意:
- offset参数以字节为单位,为分块的开始偏移位置。
- size参数以字节为单位,定义每个分块的大小,除最后一个Part以外,其他的Part大小都要大于5MB。但是Upload Part接口并不会立即校验上传Part的大小;只有当调用complete_multipart_upload()的时候才会校验。
- 为了保证数据在网络传输过程中不出现错误,建议您在Upload Part后,使用每个分块BOS返回的Content-MD5值分别验证已上传分块数据的正确性。当所有分块数据合成一个Object后,不再含MD5值。
- Part号码的范围是1~10000。如果超出这个范围,BOS将返回InvalidArguement的错误码。
- 每次上传Part时都要把流定位到此次上传块开头所对应的位置。
- 每次上传Part之后,BOS的返回结果会包含一个etag与块编号(partNumber),在后续完成分块上传的步骤中会用到它,因此需要将其保存起来。一般来讲这些 etag和partNumber将被保存到List中。
完成分块上传
bos_client.complete_multipart_upload(bucket_name, object_key, upload_id, part_list)
其中,part_list类型是list,里面每个元素是个dict,每个dict包含两个关键字,一个是partNumber, 一个是eTag。
示例如下:
[{'partNumber': 1, 'eTag': 'f1c9645dbc14efddc7d8a322685f26eb'}, {'partNumber': 2, 'eTag': 'f1c9645dbc14efddc7d8a322685f26eb'}, {'partNumber': 3, 'eTag': '93b885adfe0da089cdf634904fd59f71'}]
该方法返回的解析类中可供调用的参数有:
参数 | 说明 |
---|---|
bucket | Bucket名称 |
key | Object名称 |
e_tag | 每个上传分块的ETag |
location | Object的URL |
注意:此对象中包含的ETag是上传分块过程中每个Part的ETag,BOS收到用户提交的Part列表后,会逐一验证每个数据Part的有效性。当所有的数据Part验证通过后,BOS将把这些数据part组合成一个完整的Object。
取消分块上传事件
用户可以使用abort_multipart_upload方法取消分块上传:
bos_client.abort_multipart_upload(bucket_name, object_key, upload_id = upload_id)
获取未完成的分块上传事件
用户可以使用如下两种方法获取Bucket中未完成的分块上传事件:
方法一:
response = bos_client.list_multipart_uploads(bucket_name)
for item in response.uploads:
print(item.upload_id)
list_multipart_uploads
每次BOS最多返回1000个Multipart Upload,BOS支持prefix和delimiter过滤。
list_multipart_uploads
方法可供调用的参数还有:
名称 | 类型 | 描述 | 是否必需 |
---|---|---|---|
delimiter | String | 分隔符; 主要应此项实现list文件夹的逻辑 | 否 |
key_marker | String | Object按照字典序排序后,本次从keyMarker的后面的一条开始返回 | 否 |
max_uploads | Int | 本次请求返回Multipart Uploads的最大数目,默认1000,最大1000 | 否 |
prefix | String | key前缀,限定返回的object key必须以此为前缀 | 否 |
list_multipart_uploads
方法返回的解析类中可供调用的参数有:
参数 | 说明 |
---|---|
bucket | Bucket名称 |
key_marker | 开始上传的分块Object名称 |
next_key_marker | 当指定了delimiter且IsTruncated true时,才返回此项,作为下次查询marker的值 |
is_truncated | 指明是否所有查询都返回了;false-本次已经返回所有结果,true-本次还没有返回所有结果 |
prefix | 匹配以prefix开始到第一次出现Delimiter字符之间的object作为一组元素返回 |
common_prefixes | 仅当指定delimiter,才会返回此项 |
delimiter | 查询的结束符 |
max_uploads | 请求返回的最大数目 |
uploads | 全部未完成的分快上传事件容器 |
+owner | 对应Bucket所属用户信息 |
+id | Bucket Owner的用户id |
+display_name | Bucket Owner的名称 |
+key | 分块所属Object名称 |
+upload_id | 分块上传id |
+initiated | 分块上传开始时间 |
list_all_multipart_uploads
方法返回uploads的生成器(Generator),并且不受单次最大返回1000个结果的限制,会返回所有的结果。
方法二:
uploads = bos_client.list_all_multipart_uploads(bucket_name)
for item in uploads:
print(item.upload_id)
获取所有已上传的块信息
用户可以使用如下两种方法获取某个上传事件中所有已上传的块:
方法一:
response = bos_client.list_parts(bucket_name, object_key, upload_id)
for item in response.parts:
print(item.part_number)
注意:
- BOS按照PartNumber升序排序。
- 由于网络传输可能出错,所以不推荐用ListParts出来的结果生成最后CompleteMultipartUpload的Part列表。
list_parts
方法可供调用的参数还有:
名称 | 类型 | 描述 | 是否必需 |
---|---|---|---|
max_parts | Int | BOS一次最多返回的part数目,默认1000,最大1000 | 否 |
part_number_marker | Int | 按照partNumber排序,本次请求的起始part从此partNumber的下一个开始返回 | 否 |
list_parts
方法返回的解析类中可供调用的参数有:
参数 | 说明 |
---|---|
bucket | Bucket名称 |
key | Object名称 |
initiated | 本次分块上传开始时间 |
max_parts | 请求返回的最大数目 |
is_truncated | 指明是否所有查询都返回了;false-本次已经返回所有结果,true-本次还没有返回所有结果 |
storage_class | Object的存储类型,目前分为标准类型STANDARD , 低频类型STANDARD_IA 、冷存储类型COLD 和归档类型ARCHIVE |
part_number_marker | 分块开始标记位 |
parts | 分块列表,list类型 |
+part_number | 分块编号 |
+last_modified | 此分块最后一次被修改的时间 |
+e_tag | 每个上传分块的ETag |
+size | 分块内容的大小(字节数) |
upload_id | 本次分块上传的id |
owner | 对应bucket所属用户信息 |
+id | Bucket owner的用户id |
+display_name | Bucket owner的名称 |
next_part_number_marker | 本次请求返回的最后一条记录的partNumber,可以作为下一次请求的part_number_marker |
方法二:
parts = bos_client.list_all_parts(bucket_name, object_key, upload_id = upload_id)
for item in parts:
print(item.part_number)
list_all_parts
方法返回parts的生成器(Generator),并且不受单次最大返回1000个结果的限制,会返回所有的结果。
获取分块上传的Object的存储类型
response = bos_client.list_parts(bucket_name=bucket,
key=object_key,
upload_id=upload_id)
print(response.storage_class)
封装分块上传
在Python SDK中,BOS为用户提供了put_super_obejct_from_file()接口,它对分块上传涉及到的initiate_multipart_upload、upload_part_from_file、complete_multipart_upload三个方法进行封装,用户只需调用该接口即可完成分块上传。
import multiprocessing
file_name = "/path/to/file.zip"
result = bos_client.put_super_obejct_from_file(bucket_name, key, file_name,
chunk_size=5, thread_num=multiprocessing.cpu_count())
if result:
print("Upload success!")
方法可供调用的参数还有:
名称 | 类型 | 描述 | 是否必需 |
---|---|---|---|
chunk_size | int | 分块大小,单位MB。默认为5MB | 否 |
thread_num | int | 分块上传中线程池中线程的数量,默认等于CPU的核数 | 否 |
若一个大文件耗时很长,用户想结束分块上传,可调用UploadTaskHandle中的cancel()方法实现取消分块上传操作。示例如下:
import threading
from baidubce.services.bos.bos_client import UploadTaskHandle
file_name = "/path/to/file.zip"
uploadTaskHandle = UploadTaskHandle()
t = threading.Thread(target=bos_client.put_super_obejct_from_file, args=(bucket_name, key, file_name),
kwargs={
"chunk_size": 5,
"thread_num": multiprocessing.cpu_count(),
"uploadTaskHandle": uploadTaskHandle
})
t.start()
time.sleep(2)
uploadTaskHandle.cancel()
t.join()
断点续传上传
当用户向BOS上传大文件时,如果网络不稳定或者遇到程序崩等情况,则整个上传就失败了,失败前已经上传的部分也作废,用户不得不重头再来。这样做不仅浪费资源,在网络不稳定的情况下,往往重试多次还是无法完成上传。 基于上述场景,BOS提供了断点续传上传的能力:
- 当网络情况一般的情况下,建议使用三步上传方式,将object分为1Mb的块,参考分块上传。
- 当您的网络情况非常差,推荐使用appendObject的方式进行断点续传,每次append 较小数据256kb,参考追加上传。
提示
- 断点续传是分片上传的封装和加强,是用分片上传实现的;
- 文件较大或网络环境较差时,推荐使用分片上传;
抓取上传
如下代码用于从指定URL抓取资源,并将资源存储到指定的Bucket中。此操作需要请求者对该Bucket有写权限,每次只能抓取一个Object,且用户可以自定义Object的名称,默认同步抓取。详情可参考FetchObject接口。
from baidubce.services.bos.bos_client import FETCH_MODE_ASYNC
from baidubce.services.bos import storage_class
fetch_url="<YOUR_URL>"
// 默认同步抓取
bos_client.fetch_object(bucket_name, object_key, fetch_url)
// 异步抓取
response = bos_client.fetch_object(bucket_name,
object_key,
fetch_url,
fetch_mode=FETCH_MODE_ASYNC,
storage_class=storage_class.COLD)
print("jobId:{}, return code:{}, return message:{}".format(response.job_id,
response.code,
response.message))
获取上传进度
Python SDK支持在上传过程中实时提供上传进度信息。目前支持简单上传,追加上传,分块上传。需要在对应接口上增加progress_callback参数,并提供进度条回调函数,也可调用工具类中的默认进度条回调函数。
回调函数示例如下:
def percentage(consumed_bytes, total_bytes):
"""进度条回调函数,计算当前完成的百分比
:param consumed_bytes: 已经上传/下载的数据量
:param total_bytes: 总数据量
"""
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate))
sys.stdout.flush()
# progress_callback为可选参数,用于实现进度条功能.
bos_client.put_object(bucket_name, object_key, data, content_length,content_md5, progress_callback=percentage)
推荐使用工具类中的默认进度条回调函数(utils.default_progress_callback),目前支持百分比和进度条展示,示例如下:
# 引入 SDK 工具类包
from baidubce import utils
# progress_callback为可选参数,用于实现进度条功能.
bos_client.put_object(bucket_name, object_key, data, content_length,content_md5, progress_callback=utils.default_progress_callback)
- put_object 示例代码
# 引入 SDK 工具类包
from baidubce import utils
# progress_callback为可选参数,用于实现进度条功能.
data = open(file_name, 'rb')
bos_client.put_object(bucket_name, object_key, data, content_length, content_md5,
progress_callback=utils.default_progress_callback)
#从字符串中上传的Object
bos_client.put_object_from_string(bucket_name, object_key, string,
progress_callback=utils.default_progress_callback)
#从文件中上传的Object
bos_client.put_object_from_file(bucket_name, object_key, file_name,
progress_callback=utils.default_progress_callback)
- append_object 示例代码
# 引入 SDK 工具类包
from baidubce import utils
# progress_callback为可选参数,用于实现进度条功能.
#上传appendable object。
response = bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=data,
content_md5=content_md5(data), content_length=content_length(data),
progress_callback=utils.default_progress_callback)
#从字符串上传appendable object
result = bos_client.append_object_from_string(bucket_name=bucket_name,
key=object_key,
data=String, progress_callback=utils.default_progress_callback)
- upload_part_from_file 示例代码
# 引入 SDK 工具类包
from baidubce import utils
# progress_callback为可选参数,用于实现进度条功能.
bos_client.upload_part_from_file(
bucket_name, key, upload_id, part_number, part_size, file_name, offset, progress_callback=utils.default_progress_callback)~~~~
支持单链接限速
对象存储BOS对单Bucket的公网带宽阈值为10Gbit/s,内网带宽阈值为50Gbit/s,当用户的上传或下载占用带宽达到带宽限制阈值时,会返回RequestRateLimitExceeded的错误码。为保证用户能够正常使用服务,BOS支持在进行上传、下载等行为时进行流量控制,保证大流量服务占用带宽不会对其他应用服务造成影响。
上传类请求接口示例
限速值的取值范围为819200~838860800,单位为bit/s,即100KB/s~100MB/s。限速值取值必须为数字,BOS将按照指定的限速值对此次请求进行限速,当限速值不在此范围或不合法时将返回400错误码。
traffic_limit_speed = 819200 * 5
# put a file as object
_create_file(file_name, 5 * 1024 * 1024)
bos_client.put_object_from_file(bucket_name, key, file_name, traffic_limit=traffic_limit_speed)
# multi-upload operation samples
# put a super file to object
_create_file(file_name, 10 * 1024 * 1024)
# SuperFile step 1: init multi-upload
upload_id = bos_client.initiate_multipart_upload(bucket_name, key).upload_id
# SuperFile step 2: upload file part by part
left_size = os.path.getsize(file_name)
offset = 0
part_number = 1
part_list = []
while left_size > 0:
part_size = 5 * 1024 * 1024
if left_size < part_size:
part_size = left_size
response = bos_client.upload_part_from_file(bucket_name, key, upload_id, part_number, part_size, file_name, offset, traffic_limit=traffic_limit_speed)
left_size -= part_size
offset += part_size
# your should store every part number and etag to invoke complete multi-upload
part_list.append({
"partNumber": part_number,
"eTag": response.metadata.etag
})
part_number += 1
# copy a object
bos_client.copy_object(source_bucket, source_key, target_bucket, target_key, traffic_limit=traffic_limit_speed)
# append object
bos_client.append_object(bucket_name, key, traffic_limit=traffic_limit_speed)
# upload part copy
bos_client.upload_part_copy(source_bucket, source_key, target_bucket, target_key, upload_id, part_number,
part_size, offset, traffic_limit=traffic_limit_speed)