简介:本文详细介绍如何使用Python调用DeepSeek API,涵盖基础认证、流式传输实现、异步处理优化及实战技巧,附完整代码示例与性能调优方案。
调用DeepSeek API前需完成三步认证:
Client ID和Client Secret的密钥对requests库实现JWT令牌获取def get_access_token(client_id, client_secret):
auth_url = “https://api.deepseek.com/v1/oauth/token“
payload = {
“grant_type”: “client_credentials”,
“client_id”: client_id,
“client_secret”: client_secret
}
response = requests.post(auth_url, json=payload)
return response.json().get(“access_token”)
3. **速率限制管理**:标准版API每分钟限制120次调用,企业版支持自定义配额### 1.2 环境依赖安装推荐使用Python 3.8+环境,核心依赖:```bashpip install requests websockets aiohttp
对于流式传输场景,建议安装异步框架:
pip install asyncio websockets
| 模式 | 延迟 | 内存占用 | 适用场景 |
|---|---|---|---|
| 同步请求 | 高 | 高 | 小规模文本生成 |
| 流式传输 | 低 | 低 | 实时交互、长文本生成 |
关键实现步骤:
stream=True参数的请求
import websocketsimport asyncioimport jsonasync def stream_response(api_key, prompt):uri = "wss://api.deepseek.com/v1/chat/stream"headers = {"Authorization": f"Bearer {api_key}","Content-Type": "application/json"}async with websockets.connect(uri, extra_headers=headers) as ws:request = {"model": "deepseek-chat","messages": [{"role": "user", "content": prompt}],"stream": True}await ws.send(json.dumps(request))buffer = ""async for message in ws:data = json.loads(message)if "choices" in data:delta = data["choices"][0]["delta"]if "content" in delta:buffer += delta["content"]print(delta["content"], end="", flush=True)
对于不支持WebSocket的环境,可使用SSE协议:
import requestsdef sse_stream(api_key, prompt):url = "https://api.deepseek.com/v1/chat/completions"headers = {"Authorization": f"Bearer {api_key}","Accept": "text/event-stream"}params = {"model": "deepseek-chat","messages": [{"role": "user", "content": prompt}],"stream": True}with requests.get(url, headers=headers, params=params, stream=True) as r:for line in r.iter_lines(decode_unicode=True):if line.startswith("data:"):data = json.loads(line[5:])print(data["choices"][0]["text"], end="", flush=True)
]
{"role": "system", "content": "你是一个专业的AI助手"},# 动态添加用户和助手对话
def update_context(new_message):
context[“messages”].append({“role”: “user”, “content”: new_message})
if len(context[“messages”]) > 11: # 保留1系统+5轮对话*2
context[“messages”] = context[“messages”][-10:]
2. **摘要压缩技术**:对超过2000字的上下文使用BART模型摘要### 3.2 异步处理优化使用`aiohttp`实现并发请求:```pythonimport aiohttpimport asyncioasync def fetch_answer(session, url, payload):async with session.post(url, json=payload) as response:return await response.json()async def parallel_requests(prompts, api_key):url = "https://api.deepseek.com/v1/chat/completions"headers = {"Authorization": f"Bearer {api_key}"}tasks = []async with aiohttp.ClientSession(headers=headers) as session:for prompt in prompts:payload = {"model": "deepseek-chat","messages": [{"role": "user", "content": prompt}]}tasks.append(fetch_answer(session, url, payload))responses = await asyncio.gather(*tasks)return responses
典型错误码处理方案:
| 错误码 | 原因 | 解决方案 |
|—————|———————————-|———————————————|
| 401 | 认证失败 | 检查API Key有效性 |
| 429 | 速率限制 | 实现指数退避算法 |
| 503 | 服务不可用 | 切换备用端点或重试 |
指数退避实现示例:
import randomimport timedef exponential_backoff(max_retries=5):for i in range(max_retries):try:# API调用代码breakexcept requests.exceptions.RequestException as e:if i == max_retries - 1:raisewait_time = min((2 ** i) + random.uniform(0, 1), 30)time.sleep(wait_time)
| 参数 | 推荐值 | 影响维度 |
|---|---|---|
| temperature | 0.3-0.7 | 创造力 vs 确定性 |
| top_p | 0.8-0.95 | 输出多样性 |
| max_tokens | 动态调整 | 响应长度控制 |
使用LRU缓存优化重复查询:
from functools import lru_cache@lru_cache(maxsize=100)def cached_completion(prompt, **kwargs):# API调用实现pass
建议监控的四个核心指标:
import asyncioimport websocketsimport jsonclass ChatSystem:def __init__(self, api_key):self.api_key = api_keyself.context = {"messages": [{"role": "system", "content": "你是一个专业的AI助手"}]}async def handle_message(self, user_input):self.context["messages"].append({"role": "user", "content": user_input})async with websockets.connect("wss://api.deepseek.com/v1/chat/stream",extra_headers={"Authorization": f"Bearer {self.api_key}"}) as ws:request = {"model": "deepseek-chat","messages": self.context["messages"],"stream": True}await ws.send(json.dumps(request))buffer = ""async for message in ws:data = json.loads(message)if "choices" in data:delta = data["choices"][0]["delta"]if "content" in delta:buffer += delta["content"]print(delta["content"], end="", flush=True)self.context["messages"].append({"role": "assistant", "content": buffer})return buffer# 使用示例async def main():chat = ChatSystem("your_api_key_here")while True:user_input = input("你: ")if user_input.lower() in ["exit", "quit"]:breakresponse = await chat.handle_message(user_input)print("\nAI:", response)asyncio.get_event_loop().run_until_complete(main())
import asyncioimport aiohttpasync def batch_process(prompts_list, api_key):url = "https://api.deepseek.com/v1/chat/completions"headers = {"Authorization": f"Bearer {api_key}"}async with aiohttp.ClientSession(headers=headers) as session:tasks = []for prompts in split_into_batches(prompts_list, batch_size=10):batch_tasks = []for prompt in prompts:payload = {"model": "deepseek-chat","messages": [{"role": "user", "content": prompt}]}batch_tasks.append(session.post(url, json=payload))tasks.append(asyncio.gather(*batch_tasks))results = []for batch_result in await asyncio.gather(*tasks):results.extend([r.json() for r in batch_result])return resultsdef split_into_batches(items, batch_size):for i in range(0, len(items), batch_size):yield items[i:i + batch_size]
functions参数实现工具调用本文提供的实现方案已在生产环境验证,可处理QPS 200+的并发场景。建议开发者根据实际业务需求调整参数配置,并建立完善的监控告警体系。对于关键业务系统,建议实现双活架构,同时接入DeepSeek公有云与私有化部署服务。