搜索本产品文档关键词
基础(必看):鉴权字符串生成
所有文档
menu
没有找到结果,请重新输入

对象存储 BOS

基础(必看):鉴权字符串生成

鉴权字符串生成

使用原始API进行访问BOS,需要自行计算鉴权信息。使用流程:

  1. 准备 ak/sk
  2. python3.6及以上环境

生成鉴权字符串完整demo

示例生成auth_keyGET请求访问Bucket为例

  • 鉴权字符串算法代码
import hashlib
import hmac
import string
import datetime


AUTHORIZATION = "authorization"
BCE_PREFIX = "x-bce-"
DEFAULT_ENCODING = 'UTF-8'


# 保存AK/SK的类
class BceCredentials(object):
    def __init__(self, access_key_id, secret_access_key):
        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key


# 根据RFC 3986,除了:
#   1.大小写英文字符
#   2.阿拉伯数字
#   3.点'.'、波浪线'~'、减号'-'以及下划线'_'
# 以外都要编码
RESERVED_CHAR_SET = set(string.ascii_letters + string.digits + '.~-_')


def get_normalized_char(i):
    char = chr(i)
    if char in RESERVED_CHAR_SET:
        return char
    else:
        return '%%%02X' % i


NORMALIZED_CHAR_LIST = [get_normalized_char(i) for i in range(256)]


# 规范化字符串
def normalize_string(in_str, encoding_slash=True):
    if in_str is None:
        return ''

    # 如果输入是unicode,则先使用UTF8编码之后再编码
    # in_str = in_str.encode(DEFAULT_ENCODING) if isinstance(in_str, unicode) else str(in_str)
    in_str = in_str if isinstance(in_str, str) else str(in_str)

    # 在生成规范URI时。不需要对斜杠'/'进行编码,其他情况下都需要
    if encoding_slash:
        encode_f = lambda c: NORMALIZED_CHAR_LIST[ord(c)]
    else:
        # 仅仅在生成规范URI时。不需要对斜杠'/'进行编码
        encode_f = lambda c: NORMALIZED_CHAR_LIST[ord(c)] if c != '/' else c

    # 按照RFC 3986进行编码
    return ''.join([encode_f(ch) for ch in in_str])


# 生成规范URI
def get_canonical_uri(path):
    # 规范化URI的格式为:/{bucket}/{object},并且要对除了斜杠"/"之外的所有字符编码
    return normalize_string(path, False)


# 生成规范query string
def get_canonical_querystring(params):
    if params is None:
        return ''

    # 除了authorization之外,所有的query string全部加入编码
    result = ['%s=%s' % (k, normalize_string(v)) for k, v in params.items() if k.lower != AUTHORIZATION]

    # 按字典序排序
    result.sort()

    # 使用&符号连接所有字符串并返回
    return '&'.join(result)


# 生成规范header
def get_canonical_headers(headers, headers_to_sign=None):
    headers = headers or {}

    # 没有指定header_to_sign的情况下,默认使用:
    #   1.host
    #   2.content-md5
    #   3.content-length
    #   4.content-type
    #   5.所有以x-bce-开头的header项
    # 生成规范header
    if headers_to_sign is None or len(headers_to_sign) == 0:
        headers_to_sign = {"host", "content-md5", "content-length", "content-type"}

    # 对于header中的key,去掉前后的空白之后需要转化为小写
    # 对于header中的value,转化为str之后去掉前后的空白
    # f = lambda (key, value): (key.strip().lower(), str(value).strip())
    f = lambda item: (item[0].strip().lower(), str(item[1]).strip())

    result = []
    for k, v in map(f, headers.items()):
        # 无论何种情况,以x-bce-开头的header项都需要被添加到规范header中
        if k.startswith(BCE_PREFIX) or k in headers_to_sign:
            result.append("%s:%s" % (normalize_string(k), normalize_string(v)))

    # 按照字典序排序
    result.sort()

    # 使用\n符号连接所有字符串并返回
    return '\n'.join(result)


# 签名主算法
def sign(credentials, http_method, path, headers, params, expiration_in_seconds=1800, headers_to_sign=None):
    headers = headers or {}
    params = params or {}
    x_bce_date = headers.get("x-bce-date")
    # timestamp = int(datetime.datetime.strptime(x_bce_date, "%Y-%m-%dT%H:%M:%SZ").timestamp())
    # 1.生成sign key
    # 1.1.生成auth-string,格式为:bce-auth-v1/{accessKeyId}/{timestamp}/{expirationPeriodInSeconds}
    sign_key_info = 'bce-auth-v1/%s/%s/%d' % (
        credentials.access_key_id,
        x_bce_date,
        expiration_in_seconds)
    # 1.2.使用auth-string加上SK,用SHA-256生成sign key
    sign_key = hmac.new(
        credentials.secret_access_key.encode("utf-8"),
        sign_key_info.encode("utf-8"),
        hashlib.sha256).hexdigest()

    # 2.生成规范化uri
    canonical_uri = get_canonical_uri(path)

    # 3.生成规范化query string
    canonical_querystring = get_canonical_querystring(params)

    # 4.生成规范化header
    canonical_headers = get_canonical_headers(headers, headers_to_sign)

    # 5.使用'\n'将HTTP METHOD和2、3、4中的结果连接起来,成为一个大字符串
    string_to_sign = '\n'.join(
        [http_method, canonical_uri, canonical_querystring, canonical_headers])

    # 6.使用5中生成的签名串和1中生成的sign key,用SHA-256算法生成签名结果
    sign_result = hmac.new(sign_key.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()

    # 7.拼接最终签名结果串
    if headers_to_sign:
        # 指定header to sign
        result = '%s/%s/%s' % (sign_key_info, ';'.join(headers_to_sign), sign_result)
    else:
        # 不指定header to sign情况下的默认签名结果串
        result = '%s//%s' % (sign_key_info, sign_result)
    return result


def get_x_bce_date(date=None):
    """获取UTC格式的时间"""
    if date is None:
        format_string = "%Y-%m-%dT%H:%M:%SZ"
        date = datetime.datetime.utcnow().strftime(format_string)
    return date


def generate_bos_auth(path, ak, sk, method, headers, params, expire_seconds=1800):
    """生成bos的鉴权字符串,过期时间默认1800s"""
    credentials = BceCredentials(ak, sk)
    return sign(credentials, method, path, headers, params, expire_seconds, headers.keys())
  • 调用鉴权字符串
access_key = "AK"   
secret_key = "SK"
bucket_name = "bucketName"              # 替换为需要访问bucketName
object_key = "/aaa.png"                 # 在请求url没有参数时,访问的key的名字就是path,注意以 '/'开头
region = "bj.bcebos.com"                # bucket所在的区域,不同区域服务域名不同
request_method = "GET"                  # 请求方法类型 GET、PUT、HEAD、POST、DELETE
host = f"{bucket_name}.{region}"        # 根据bucket和服务域名组装访问域名
# 构造访问的header信息,可以根据需要进行额外添加,例如:content-type字段
header = {
    "host": host,
    "x-bce-date": get_x_bce_date()
}
# 如果请求在url中有带参数,也要参与鉴权
param = {}
# 生成鉴权字符串
auth_key = generate_bos_auth(object_key, access_key, secret_key, request_method, header, param)
# 构建完整的访问url
request_url = f"http://{host}{object_key}?{AUTHORIZATION}={auth_key}"

将<鉴权字符串算法代码><调用鉴权字符串>两部分代码组成一个py文件即可执行

注意:
  1. 生成的auth_key只是一个authorization字段,需要拼接为request_url后才能进行正常访问。
  2. request_url访问时需要带上x-bce-date字段,该字段尽量与生成签名时的x-bce-date字段相同,postman请求示例: image.png
  3. 更多详情参考生成签名字符串
上一篇
GET请求下载Bucket文件到本地
下一篇
BOS专线回源IDC最佳实践