SampleCode
更新时间:2024-08-30
该部分主要集成了百度智能云通用鉴权说明,用户可以在HTTP请求的Header中包含Authorization信息,以验证请求者的身份有效性,即:在HTTP Header中加入Authorization:<认证字符串>。
以标记号码查询为例,进行说明。请查看下面示例:
Python示例:
Plain Text
1# -*- coding: UTF-8 -*-
2import hashlib
3import hmac
4import string
5import datetime
6import requests
7import json
8import time
9
10AUTHORIZATION = "authorization"
11BCE_PREFIX = "x-bce-"
12DEFAULT_ENCODING = 'UTF-8'
13
14
15# 保存AK/SK的类
16class BceCredentials(object):
17 def __init__(self, access_key_id, secret_access_key):
18 self.access_key_id = access_key_id
19 self.secret_access_key = secret_access_key
20
21
22# 根据RFC 3986,除了:
23# 1.大小写英文字符
24# 2.阿拉伯数字
25# 3.点'.'、波浪线'~'、减号'-'以及下划线'_'
26# 以外都要编码
27RESERVED_CHAR_SET = set(string.ascii_letters + string.digits + '.~-_')
28
29
30def get_normalized_char(i):
31 char = chr(i)
32 if char in RESERVED_CHAR_SET:
33 return char
34 else:
35 return '%%%02X' % i
36
37
38NORMALIZED_CHAR_LIST = [get_normalized_char(i) for i in range(256)]
39
40
41# 正规化字符串
42def normalize_string(in_str, encoding_slash=True):
43 if in_str is None:
44 return ''
45
46 # 如果输入是unicode,则先使用UTF8编码之后再编码
47 in_str = in_str.encode(DEFAULT_ENCODING) if isinstance(
48 in_str, unicode) else str(in_str)
49
50 # 在生成规范URI时。不需要对斜杠'/'进行编码,其他情况下都需要
51 if encoding_slash:
52 def encode_f(c): return NORMALIZED_CHAR_LIST[ord(c)]
53 else:
54 # 仅仅在生成规范URI时。不需要对斜杠'/'进行编码
55 def encode_f(c): return NORMALIZED_CHAR_LIST[ord(c)] if c != '/' else c
56
57 # 按照RFC 3986进行编码
58 return ''.join([encode_f(ch) for ch in in_str])
59
60
61# 生成规范时间戳
62def get_canonical_time(timestamp=0):
63 # 不使用任何参数调用的时候返回当前时间
64 if timestamp == 0:
65 utctime = datetime.datetime.utcnow()
66 else:
67 utctime = datetime.datetime.utcfromtimestamp(timestamp)
68
69 # 时间戳格式:[year]-[month]-[day]T[hour]:[minute]:[second]Z
70 return "%04d-%02d-%02dT%02d:%02d:%02dZ" % (
71 utctime.year, utctime.month, utctime.day,
72 utctime.hour, utctime.minute, utctime.second)
73
74
75# 生成规范URI
76def get_canonical_uri(path):
77 # 规范化URI的格式为:/{bucket}/{object},并且要对除了斜杠"/"之外的所有字符编码
78 return normalize_string(path, False)
79
80
81# 生成规范query string
82def get_canonical_querystring(params):
83 if params is None:
84 return ''
85
86 # 除了authorization之外,所有的query string全部加入编码
87 result = ['%s=%s' % (k, normalize_string(v)) for k, v in params.items() if
88 k.lower != AUTHORIZATION]
89
90 # 按字典序排序
91 result.sort()
92
93 # 使用&符号连接所有字符串并返回
94 return '&'.join(result)
95
96
97# 生成规范header
98def get_canonical_headers(headers, headers_to_sign=None):
99 headers = headers or {}
100
101 # 没有指定header_to_sign的情况下,默认使用:
102 # 1.host
103 # 2.content-md5
104 # 3.content-length
105 # 4.content-type
106 # 5.所有以x-bce-开头的header项
107 # 生成规范header
108 if headers_to_sign is None or len(headers_to_sign) == 0:
109 headers_to_sign = {"host", "content-md5",
110 "content-length", "content-type"}
111
112 # 对于header中的key,去掉前后的空白之后需要转化为小写
113 # 对于header中的value,转化为str之后去掉前后的空白
114 def f((key, value)): return (key.strip().lower(), str(value).strip())
115
116 result = []
117 for k, v in map(f, headers.iteritems()):
118 # 无论何种情况,以x-bce-开头的header项都需要被添加到规范header中
119 if k.startswith(BCE_PREFIX) or k in headers_to_sign:
120 result.append("%s:%s" % (normalize_string(k), normalize_string(v)))
121
122 # 按照字典序排序
123 result.sort()
124
125 # 使用\n符号连接所有字符串并返回
126 return '\n'.join(result)
127
128
129# 签名主算法
130def sign(credentials, http_method, path, headers, params,
131 timestamp=0, expiration_in_seconds=18000, headers_to_sign=None):
132 headers = headers or {}
133 params = params or {}
134
135 # 1.生成sign key
136 # 1.1.生成auth-string,格式为:bce-auth-v1/{accessKeyId}/{timestamp}/{expirationPeriodInSeconds}
137 sign_key_info = 'bce-auth-v1/%s/%s/%d' % (
138 credentials.access_key_id,
139 get_canonical_time(timestamp),
140 expiration_in_seconds)
141 # 1.2.使用auth-string加上SK,用SHA-256生成sign key
142 sign_key = hmac.new(
143 credentials.secret_access_key,
144 sign_key_info,
145 hashlib.sha256).hexdigest()
146
147 # 2.生成规范化uri
148 canonical_uri = get_canonical_uri(path)
149
150 # 3.生成规范化query string
151 canonical_querystring = get_canonical_querystring(params)
152
153 # 4.生成规范化header
154 canonical_headers = get_canonical_headers(headers, headers_to_sign)
155
156 # 5.使用'\n'将HTTP METHOD和2、3、4中的结果连接起来,成为一个大字符串
157 string_to_sign = '\n'.join(
158 [http_method, canonical_uri, canonical_querystring, canonical_headers])
159
160 # 6.使用5中生成的签名串和1中生成的sign key,用SHA-256算法生成签名结果
161 sign_result = hmac.new(sign_key, string_to_sign,
162 hashlib.sha256).hexdigest()
163
164 # 7.拼接最终签名结果串
165 if headers_to_sign:
166 # 指定header to sign
167 result = '%s/%s/%s' % (sign_key_info,
168 ';'.join(headers_to_sign), sign_result)
169 else:
170 # 不指定header to sign情况下的默认签名结果串,但百度智能云API的唯一要求是Host域必须被编码,所以此处需要至少添加上host
171 result = '%s/host/%s' % (sign_key_info, sign_result) #官网demo需要改的地方
172 return result
173
174
175if __name__ == "__main__":
176 # 填写AK SK
177 credentials = BceCredentials(
178 "ak", "sk") # 填写ak、sk
179 # API接口的请求方法
180 http_method = "POST"
181 # 接口请求路径
182 path = "/haoma-cloud/openapi/phone-tag/1.0"
183 # 接口请求的header头
184 headers = {
185 "host": "pnvs.baidubce.com",
186 "content-type": "application/json; charset=utf-8",
187 "x-bce-date": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
188 }
189 # 接口请求参数
190 params = {
191 }
192 # 接口请求的body数据
193 body = {
194 "phone": "",
195 "appkey": "开通服务时颁发的appkey"
196 }
197 # 设置参与鉴权编码的header,即headers_to_sign,至少包含host,百度智能云API的唯一要求是Host域必须被编码
198 headers_to_sign = {
199 "host",
200 "x-bce-date",
201 }
202 # 设置到期时间,默认1800s
203 expiration_in_seconds = 18000
204 # 设置参与鉴权的时间戳
205 timestamp = int(time.time())
206 # 生成鉴权字符串
207 result = sign(credentials, http_method, path, headers, params, timestamp, expiration_in_seconds,
208 headers_to_sign)
209 print result
210 # 使用request进行请求接口
211 request = {
212 'method': http_method,
213 'uri': path,
214 'headers': headers,
215 'params': params
216 }
217 # headers字典中需要加上鉴权字符串authorization的请求头
218 headers['authorization'] = result
219 print headers
220
221 # 拼接接口的url地址
222 url = 'http://%s%s' % (headers['host'],
223 request['uri'])
224
225 # 发起请求
226 response = requests.request(
227 request["method"], url, headers=headers, data=json.dumps(body))
228 # 打印请求结果
229 print 'url: ', response.url
230 print 'status: ', response.status_code
231 print 'headers: ', response.headers
232 print 'response: ', response.text