基础(必看):鉴权字符串生成
更新时间:2022-12-01
鉴权字符串生成
使用原始API进行访问BOS,需要自行计算鉴权信息。使用流程:
- 准备 ak/sk
- python3.6及以上环境
生成鉴权字符串完整demo
示例生成auth_key
以GET
请求访问Bucket为例
- 鉴权字符串算法代码
import hashlib
import hmac
import string
import datetime
AUTHORIZATION = "authorization"
BCE_PREFIX = "x-bce-"
DEFAULT_ENCODING = 'UTF-8'
# 保存AK/SK的类
class BceCredentials(object):
def __init__(self, access_key_id, secret_access_key):
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
# 根据RFC 3986,除了:
# 1.大小写英文字符
# 2.阿拉伯数字
# 3.点'.'、波浪线'~'、减号'-'以及下划线'_'
# 以外都要编码
RESERVED_CHAR_SET = set(string.ascii_letters + string.digits + '.~-_')
def get_normalized_char(i):
char = chr(i)
if char in RESERVED_CHAR_SET:
return char
else:
return '%%%02X' % i
NORMALIZED_CHAR_LIST = [get_normalized_char(i) for i in range(256)]
# 规范化字符串
def normalize_string(in_str, encoding_slash=True):
if in_str is None:
return ''
# 如果输入是unicode,则先使用UTF8编码之后再编码
# in_str = in_str.encode(DEFAULT_ENCODING) if isinstance(in_str, unicode) else str(in_str)
in_str = in_str if isinstance(in_str, str) else str(in_str)
# 在生成规范URI时。不需要对斜杠'/'进行编码,其他情况下都需要
if encoding_slash:
encode_f = lambda c: NORMALIZED_CHAR_LIST[ord(c)]
else:
# 仅仅在生成规范URI时。不需要对斜杠'/'进行编码
encode_f = lambda c: NORMALIZED_CHAR_LIST[ord(c)] if c != '/' else c
# 按照RFC 3986进行编码
return ''.join([encode_f(ch) for ch in in_str])
# 生成规范URI
def get_canonical_uri(path):
# 规范化URI的格式为:/{bucket}/{object},并且要对除了斜杠"/"之外的所有字符编码
return normalize_string(path, False)
# 生成规范query string
def get_canonical_querystring(params):
if params is None:
return ''
# 除了authorization之外,所有的query string全部加入编码
result = ['%s=%s' % (k, normalize_string(v)) for k, v in params.items() if k.lower != AUTHORIZATION]
# 按字典序排序
result.sort()
# 使用&符号连接所有字符串并返回
return '&'.join(result)
# 生成规范header
def get_canonical_headers(headers, headers_to_sign=None):
headers = headers or {}
# 没有指定header_to_sign的情况下,默认使用:
# 1.host
# 2.content-md5
# 3.content-length
# 4.content-type
# 5.所有以x-bce-开头的header项
# 生成规范header
if headers_to_sign is None or len(headers_to_sign) == 0:
headers_to_sign = {"host", "content-md5", "content-length", "content-type"}
# 对于header中的key,去掉前后的空白之后需要转化为小写
# 对于header中的value,转化为str之后去掉前后的空白
# f = lambda (key, value): (key.strip().lower(), str(value).strip())
f = lambda item: (item[0].strip().lower(), str(item[1]).strip())
result = []
for k, v in map(f, headers.items()):
# 无论何种情况,以x-bce-开头的header项都需要被添加到规范header中
if k.startswith(BCE_PREFIX) or k in headers_to_sign:
result.append("%s:%s" % (normalize_string(k), normalize_string(v)))
# 按照字典序排序
result.sort()
# 使用\n符号连接所有字符串并返回
return '\n'.join(result)
# 签名主算法
def sign(credentials, http_method, path, headers, params, expiration_in_seconds=1800, headers_to_sign=None):
headers = headers or {}
params = params or {}
x_bce_date = headers.get("x-bce-date")
# timestamp = int(datetime.datetime.strptime(x_bce_date, "%Y-%m-%dT%H:%M:%SZ").timestamp())
# 1.生成sign key
# 1.1.生成auth-string,格式为:bce-auth-v1/{accessKeyId}/{timestamp}/{expirationPeriodInSeconds}
sign_key_info = 'bce-auth-v1/%s/%s/%d' % (
credentials.access_key_id,
x_bce_date,
expiration_in_seconds)
# 1.2.使用auth-string加上SK,用SHA-256生成sign key
sign_key = hmac.new(
credentials.secret_access_key.encode("utf-8"),
sign_key_info.encode("utf-8"),
hashlib.sha256).hexdigest()
# 2.生成规范化uri
canonical_uri = get_canonical_uri(path)
# 3.生成规范化query string
canonical_querystring = get_canonical_querystring(params)
# 4.生成规范化header
canonical_headers = get_canonical_headers(headers, headers_to_sign)
# 5.使用'\n'将HTTP METHOD和2、3、4中的结果连接起来,成为一个大字符串
string_to_sign = '\n'.join(
[http_method, canonical_uri, canonical_querystring, canonical_headers])
# 6.使用5中生成的签名串和1中生成的sign key,用SHA-256算法生成签名结果
sign_result = hmac.new(sign_key.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
# 7.拼接最终签名结果串
if headers_to_sign:
# 指定header to sign
result = '%s/%s/%s' % (sign_key_info, ';'.join(headers_to_sign), sign_result)
else:
# 不指定header to sign情况下的默认签名结果串
result = '%s//%s' % (sign_key_info, sign_result)
return result
def get_x_bce_date(date=None):
"""获取UTC格式的时间"""
if date is None:
format_string = "%Y-%m-%dT%H:%M:%SZ"
date = datetime.datetime.utcnow().strftime(format_string)
return date
def generate_bos_auth(path, ak, sk, method, headers, params, expire_seconds=1800):
"""生成bos的鉴权字符串,过期时间默认1800s"""
credentials = BceCredentials(ak, sk)
return sign(credentials, method, path, headers, params, expire_seconds, headers.keys())
- 调用鉴权字符串
access_key = "AK"
secret_key = "SK"
bucket_name = "bucketName" # 替换为需要访问bucketName
object_key = "/aaa.png" # 在请求url没有参数时,访问的key的名字就是path,注意以 '/'开头
region = "bj.bcebos.com" # bucket所在的区域,不同区域服务域名不同
request_method = "GET" # 请求方法类型 GET、PUT、HEAD、POST、DELETE
host = f"{bucket_name}.{region}" # 根据bucket和服务域名组装访问域名
# 构造访问的header信息,可以根据需要进行额外添加,例如:content-type字段
header = {
"host": host,
"x-bce-date": get_x_bce_date()
}
# 如果请求在url中有带参数,也要参与鉴权
param = {}
# 生成鉴权字符串
auth_key = generate_bos_auth(object_key, access_key, secret_key, request_method, header, param)
# 构建完整的访问url
request_url = f"http://{host}{object_key}?{AUTHORIZATION}={auth_key}"
将<鉴权字符串算法代码><调用鉴权字符串>两部分代码组成一个py文件即可执行
注意:
- 生成的
auth_key
只是一个authorization
字段,需要拼接为request_url
后才能进行正常访问。 request_url
访问时需要带上x-bce-date
字段,该字段尽量与生成签名时的x-bce-date
字段相同,postman请求示例:- 更多详情参考生成签名字符串。