简介:本文详细解析前端如何通过fetch和axios请求DeepSeek流式接口,涵盖SSE原理、请求配置、数据处理及错误处理,提供完整代码示例与最佳实践。
流式接口(Streaming API)通过Server-Sent Events(SSE)技术实现服务器向客户端的实时数据推送,相比传统轮询或WebSocket,SSE具有轻量级、兼容性好(基于HTTP)的特点。DeepSeek的流式接口通过持续发送JSON格式的增量数据,使前端能够实时渲染AI生成内容(如对话、代码等),显著提升用户体验。
data: {json}\n\n格式传输,多个消息通过双换行符分隔Content-Type: text/event-stream和Cache-Control: no-cache
async function fetchStream(url, body) {const response = await fetch(url, {method: 'POST',headers: {'Content-Type': 'application/json','Authorization': 'Bearer YOUR_API_KEY' // DeepSeek认证},body: JSON.stringify(body)});if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);if (!response.headers.get('content-type')?.includes('event-stream')) {throw new Error('Invalid content-type');}return response.body; // 返回ReadableStream}
async function processStream(url, body, onMessage, onComplete, onError) {try {const stream = await fetchStream(url, body);const reader = stream.getReader();const decoder = new TextDecoder();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;const chunk = decoder.decode(value);buffer += chunk;// 处理可能的多消息拼接const messages = buffer.split('\n\n').filter(m => m.trim());buffer = '';for (const msg of messages) {if (msg.startsWith('data: ')) {const jsonStr = msg.substring(6).trim();try {const data = JSON.parse(jsonStr);onMessage(data); // 触发消息回调} catch (e) {console.error('Parse error:', e);}}}}onComplete?.();} catch (error) {onError?.(error);}}// 使用示例processStream('https://api.deepseek.com/v1/stream',{ prompt: '解释量子计算' },(data) => console.log('Received:', data.text),() => console.log('Stream completed'),(err) => console.error('Error:', err));
优点:
缺点:
import axios from 'axios';const axiosInstance = axios.create({baseURL: 'https://api.deepseek.com',headers: {'Authorization': 'Bearer YOUR_API_KEY','Accept': 'text/event-stream'},responseType: 'stream' // 关键配置});// 请求拦截器axiosInstance.interceptors.request.use(config => {config.timeout = 0; // 流式请求禁用超时return config;});
async function axiosStream(url, body, onMessage, onComplete, onError) {try {const response = await axiosInstance.post('/v1/stream', body, {onDownloadProgress: (progressEvent) => {// axios不直接提供SSE解析,需手动处理const rawData = progressEvent.currentTarget.response;// 实际应用中需通过transformRequest或自定义解析}});// 更推荐的方式:使用自定义transformResponseconst streamResponse = await axiosInstance.post('/v1/stream', body, {transformResponse: [data => {// 此处无法直接解析流,需结合ReadableStreamreturn data; // 实际需要更复杂的处理}]});// 实际可行方案:使用axios的适配器或结合fetch// 以下为简化版,实际需通过自定义adapter实现const stream = await new Promise((resolve, reject) => {axiosInstance.post('/v1/stream', body, {responseType: 'stream'}).then(res => {const stream = res.data; // Node.js Stream对象// 浏览器端axios无法直接获取ReadableStream// 实际开发中建议:// 1. 服务器返回文本流,前端用fetch解析// 2. 或使用axios-sse等第三方库reject(new Error('Browser axios cannot directly handle SSE'));}).catch(reject);});} catch (error) {onError?.(error);}}
由于浏览器端axios对SSE支持有限,推荐以下两种模式:
模式1:axios + 自定义解析
async function axiosWithFetchParse(url, body) {// 用axios获取初始响应(如认证)const { data: config } = await axios.post(url, body, {headers: { 'Accept': 'application/json' } // 先获取配置});// 再用fetch处理流const streamUrl = config.stream_url;return fetchStream(streamUrl, {});}
模式2:使用axios-sse库
npm install axios-sse
import { EventSourcePolyfill } from 'event-source-polyfill';import axios from 'axios';const sse = new EventSourcePolyfill('https://api.deepseek.com/stream', {headers: {'Authorization': 'Bearer YOUR_KEY','Content-Type': 'application/json'},axiosOptions: {// 可覆盖axios配置}});sse.onmessage = (event) => {const data = JSON.parse(event.data);console.log('Received:', data);};sse.onerror = (error) => {console.error('SSE error:', error);};
优点:
缺点:
let retryCount = 0;async function withRetry(fn, maxRetries = 5) {try {return await fn();} catch (error) {if (retryCount < maxRetries) {retryCount++;const delay = Math.min(1000 * Math.pow(2, retryCount), 5000);await new Promise(r => setTimeout(r, delay));return withRetry(fn, maxRetries);}throw error;}}
let debounceTimer;function debouncedProcess(data) {clearTimeout(debounceTimer);debounceTimer = setTimeout(() => {updateUI(data); // 批量更新UI}, 100);}
function classifyError(error) {if (error.message.includes('Failed to fetch')) {return { type: 'NETWORK', message: '网络连接失败' };} else if (error.response?.status === 401) {return { type: 'AUTH', message: '认证失败' };} else {return { type: 'SERVER', message: '服务器错误' };}}
import { useState, useEffect } from 'react';function DeepSeekChat() {const [messages, setMessages] = useState([]);const [loading, setLoading] = useState(false);useEffect(() => {const controller = new AbortController();async function startChat() {setLoading(true);try {await withRetry(async () => {const stream = await fetch('https://api.deepseek.com/stream', {method: 'POST',headers: { 'Authorization': 'Bearer YOUR_KEY' },body: JSON.stringify({ prompt: '你好' }),signal: controller.signal});processStream(stream, (data) => {setMessages(prev => [...prev, {id: Date.now(),text: data.text,isBot: true}]);});});} catch (error) {console.error('Chat error:', classifyError(error));} finally {setLoading(false);}}startChat();return () => controller.abort();}, []);return (<div>{messages.map(msg => (<div key={msg.id} className={msg.isBot ? 'bot' : 'user'}>{msg.text}</div>))}{loading && <div>思考中...</div>}</div>);}
| 方案 | 适用场景 | 复杂度 | 依赖管理 |
|---|---|---|---|
| 原生fetch | 轻量级应用,追求最小依赖 | 中 | 无 |
| axios+polyfill | 已有axios体系的项目 | 高 | 需polyfill |
| 专用库 | 需要开箱即用解决方案 | 低 | 需引入库 |
推荐方案:
eventsource或rxjs的流处理通过合理选择技术方案并实现健壮的错误处理机制,前端应用可以高效稳定地接入DeepSeek流式接口,为用户提供流畅的实时交互体验。