通用说明
更新时间:2024-07-17
API认证机制
所有API的安全认证一律采用Access Key与请求签名机制。Access Key由Access Key ID和Secret Access Key组成,均为字符串。对于每个HTTP请求,使用下面所描述的算法生成一个认证字符串。提交认证放在Authorization头域里。服务端根据生成算法验证认证字符串的正确性。认证字符串的格式为bce-auth-v{version}/{accessKeyId}/{timestamp}/{expirationPeriodInSeconds}/{signedHeaders}/{signature}。
- version是正整数,目前取值为1。
- timestamp 是生成签名时的 UTC 时间。
- expirationPeriodInSeconds表示签名有效期限。
- signedHeaders 是签名算法中涉及到的头域列表。头域名之间用分号(;)分隔,如 host;x-bce-date。列表按照字典序排列。(本 API 签名仅使用 host 和 x-bce-date 两个 header)
- signature是256位签名的十六进制表示,由64个小写字母组成。其生成算法见下。
当百度智能云接收到用户的请求后,系统将使用相同的 SK 和同样的认证机制生成认证字符串,并与用户请求中包含的认证字符串进行对比。如果认证字符串相同,系统认为用户拥有指定的操作权限,并执行相关操作;如果认证字符串不同,系统将忽略该操作并返回错误码。鉴权认证机制的详细内容请参见鉴权认证机制。
鉴权示例
python实现签名参考脚本
import datetime
import hmac, hashlib
import time
import urllib.parse
import requests
def get_headers(url, method: str, ak, sk):
domain = urllib.parse.urlparse(url).netloc
uri = urllib.parse.urlparse(url).path
gcloud_params = {}
if url.find('?') > 0:
tmp = url.split('?')[1]
for item in tmp.split('&'):
if item.find('=') > 0:
gcloud_params[item.split('=')[0]] = item.split('=')[1]
else:
gcloud_params[item] = ""
gcloud_uri = url.split('?')[0]
headers = {"x-bce-date": get_canonical_time(), "Content-Type": "application/json", "Host": domain} #
bce_request = {
'uri': uri, # f"http://{domain}:8793/api/cce/service/v2/cluster/cce-lksgpvx5"
'params': gcloud_params,
'method': method.upper(),
'headers': headers
}
# print(bce_request)
auth = gen_authorization(bce_request, ak, sk)
headers['Authorization'] = get_utf8_value(auth)
return headers
def get_canonical_time(timestamp=0):
"""
Get cannonical time.
:type timestamp: int
:param timestamp: None
=======================
:return:
**string of canonical_time**
"""
if timestamp == 0:
utctime = datetime.datetime.utcnow()
else:
utctime = datetime.datetime.utcfromtimestamp(timestamp)
return "%04d-%02d-%02dT%02d:%02d:%02dZ" % (
utctime.year, utctime.month, utctime.day,
utctime.hour, utctime.minute, utctime.second)
def get_canonical_time(timestamp=0):
"""
Get cannonical time.
:type timestamp: int
:param timestamp: None
=======================
:return:
**string of canonical_time**
"""
if timestamp == 0:
utctime = datetime.datetime.utcnow()
else:
utctime = datetime.datetime.utcfromtimestamp(timestamp)
return "%04d-%02d-%02dT%02d:%02d:%02dZ" % (
utctime.year, utctime.month, utctime.day,
utctime.hour, utctime.minute, utctime.second)
def gen_authorization(request, ak, sk, timestamp=None, expire_period=1800):
"""
generate authorization string
if not specify timestamp, then use current time;
"""
signedheaders = []
if "headers" in request:
signedheaders = list(key.lower() for key in request["headers"].keys() if key != '')
signedheaders.sort()
authorization = build_authorization(ak, signedheaders, expire_period, timestamp)
signingkey = _calc_signingkey(authorization, sk)
signature = _calc_signature(signingkey, request, signedheaders)
authorization["signature"] = signature
return serialize_authorization(authorization)
def serialize_authorization(auth):
"""
serialize Authorization object to authorization string
"""
val = "/".join((auth['version'], auth['access'], auth['timestamp'], auth['period'],
";".join(auth['signedheaders']), auth['signature']))
return get_utf8_value(val)
def build_authorization(accesskey, signedheaders, period=1800, timestamp=None):
"""
build Authorization object
"""
auth = {}
auth['version'] = "bce-auth-v1"
auth['access'] = accesskey
if not timestamp:
auth['timestamp'] = get_canonical_time()
else:
auth['timestamp'] = timestamp
auth['period'] = str(period)
auth['signedheaders'] = signedheaders
return auth
def _calc_signingkey(auth, sk):
""" Get a a signing key """
string_to_sign = "/".join((auth['version'], auth['access'],
auth['timestamp'], auth['period']))
signingkey = hmac.new(bytes(sk, 'utf-8'), bytes(string_to_sign, 'utf-8'),
hashlib.sha256).hexdigest()
return signingkey
def get_utf8_value(value):
"""
Get the UTF8-encoded version of a value.
"""
if isinstance(value, bytes):
return value.decode('utf-8')
return value.encode('utf-8')
def normalized_uri(uri):
"""
Construct a normalized(except slash '/') uri
eg. /json-api/v1/example/ ==> /json-api/v1/example/
"""
return urllib.parse.quote(get_utf8_value(uri), safe='-_.~/')
def normalized(msg):
"""
Construct a normalized uri
"""
return urllib.parse.quote(get_utf8_value(msg), safe='-_.~')
def canonical_qs(params):
"""
Construct a sorted, correctly encoded query string
"""
keys = list(params)
keys.sort()
pairs = []
for key in keys:
if key == "authorization":
continue
val = normalized(params[key])
pairs.append(urllib.parse.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
return qs
def canonical_header_str(headers, signedheaders=None):
"""
calculate canonicalized header string
"""
headers_norm_lower = dict()
for (k, v) in headers.items():
key_norm_lower = normalized(k.lower())
value_norm_lower = normalized(v.strip())
headers_norm_lower[key_norm_lower] = value_norm_lower
keys = list(headers_norm_lower)
keys.sort()
if "host" not in keys:
raise ValueError
header_list = []
default_signed = ("host", "content-length", "content-type", "content-md5")
if signedheaders:
for key in signedheaders:
key = normalized(key.lower())
if key not in keys:
raise ValueError
if headers_norm_lower[key]:
header_list.append(key + ":" + headers_norm_lower[key])
else:
for key in keys:
if key.startswith("x-bce-") or key in default_signed:
header_list.append(key + ":" + headers_norm_lower[key])
return '\n'.join(header_list)
def _calc_signature(key, request, signedheaders):
"""Generate BCE signature string."""
# Create canonical request
params = {}
headers = {}
# print request
if "params" in request:
params = request['params']
if "headers" in request:
headers = request['headers']
cr = "\n".join((request['method'].upper(),
normalized_uri(request['uri']),
canonical_qs(params),
canonical_header_str(headers, signedheaders)))
signature = hmac.new(bytes(key, 'utf-8'), bytes(cr, 'utf-8'), hashlib.sha256).hexdigest()
return signature
def send_request(url, method, ak, sk, headers={}, data=None, json=None):
"""
封装发送请求
"""
headers.update(get_headers(url, method, ak, sk))
request_func = getattr(requests, method)
if data:
res = request_func(url, headers=headers, data=data)
elif json:
res = request_func(url, headers=headers, json=json)
else:
res = request_func(url, headers=headers)
return res.json()
if __name__ == '__main__':
ak = "xxxx"
sk = "xxx"
# 方式1
jobcreate_url="http://aihc.bd.baidubce.com/api/v1/aijobs?resourcePoolId=cce-8c9zllli"
print('-------方式1--------')
print(send_request(jobcreate_url, "get", ak, sk))
# 方式2
print('-------方式2--------')
get_headers(jobcreate_url, "get", ak, sk)
res = requests.get(jobcreate_url, headers=get_headers(jobcreate_url, "get", ak, sk)).json()
print(res)
日期与时间规定
日期时间表示的地方一律采用UTC时间,遵循ISO 8601,并做以下约束:
- 表示日期一律采用YYYY-MM-DD方式,例如2014-06-01表示2014年6月1日
- 表示时间一律采用hh:mm:ss方式,并在最后加一个大写字母Z表示UTC时间。例如23:00:10Z表示UTC时间23点0分10秒。
- 凡涉及日期和时间合并表示时,在两者中间加大写字母T,例如2014-06-01T23:00:10Z表示UTC时间2014年6月1日23点0分10秒。