简介:本文详细解析前端如何通过fetch和axios两种方式请求deepseek流式接口,涵盖基础概念、实现步骤、代码示例及异常处理,助力开发者高效完成流式数据交互。
流式接口(Streaming API)通过分块传输数据实现实时交互,特别适用于生成式AI对话、实时日志推送等场景。在deepseek的AI服务中,流式接口可实现”边生成边显示”的对话效果,显著提升用户体验。相比传统一次性返回全部数据的接口,流式接口具有三大优势:
async function fetchStream(prompt) {const response = await fetch('https://api.deepseek.com/v1/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json','Authorization': `Bearer ${API_KEY}`},body: JSON.stringify({model: 'deepseek-chat',messages: [{role: 'user', content: prompt}],stream: true // 关键参数})});if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);return response.body; // 返回ReadableStream}
通过TextDecoder和ReadableStream的API实现流式解析:
async function processStream(stream) {const reader = stream.getReader();const decoder = new TextDecoder();let partialResponse = '';while (true) {const { done, value } = await reader.read();if (done) break;const chunk = decoder.decode(value, { stream: true });partialResponse += chunk;// 处理可能的完整JSON分块const messages = partialResponse.split('\n\n');messages.forEach(msg => {if (msg.trim() && !msg.startsWith('data: [DONE]')) {try {const data = JSON.parse(msg.replace('data: ', ''));if (data.choices[0].delta?.content) {console.log('Received:', data.choices[0].delta.content);// 更新UI的逻辑}} catch (e) {// 忽略不完整的JSON}}});partialResponse = messages[messages.length - 1];}}
async function chatWithDeepseek(prompt) {try {const stream = await fetchStream(prompt);await processStream(stream);} catch (error) {console.error('Stream error:', error);// 错误处理逻辑}}
axios需要显式设置responseType: 'stream':
const axios = require('axios'); // Node环境// 或使用axios的浏览器版本async function axiosStream(prompt) {const response = await axios({method: 'post',url: 'https://api.deepseek.com/v1/chat/stream',headers: {'Content-Type': 'application/json','Authorization': `Bearer ${API_KEY}`},data: {model: 'deepseek-chat',messages: [{role: 'user', content: prompt}],stream: true},responseType: 'stream' // 关键配置});return response.data; // Node中的stream.Readable}
在浏览器环境中,axios的流式处理需要借助transformResponse:
async function browserAxiosStream(prompt) {const response = await axios({method: 'post',url: 'https://api.deepseek.com/v1/chat/stream',headers: { /* 同上 */ },data: { /* 同上 */ },transformResponse: [data => {// axios浏览器端默认不解析流,需手动处理const reader = data.body?.getReader();// 需自行实现类似fetch的解析逻辑return { reader };}]});// 实际开发中建议:// 1. 使用fetch处理浏览器流// 2. 或通过后端中转流数据}
async function nodeAxiosChat(prompt) {const stream = await axiosStream(prompt);stream.on('data', (chunk) => {const text = chunk.toString();// 处理SSE格式数据text.split('\n\n').forEach(msg => {if (msg.trim() && !msg.startsWith('data: [DONE]')) {try {const data = JSON.parse(msg.replace('data: ', ''));// 处理delta内容} catch (e) { /* 忽略错误 */ }}});});stream.on('end', () => console.log('Stream complete'));stream.on('error', (err) => console.error('Stream error:', err));}
| 特性 | fetch方案 | axios方案 |
|---|---|---|
| 浏览器兼容性 | 原生支持,无需额外依赖 | 需处理浏览器流差异 |
| 错误处理 | 通过Promise.reject | 可通过事件监听或catch |
| 取消请求 | 使用AbortController | 使用CancelToken或signal |
| 进度监控 | 通过reader.read()的done状态 | 通过’data’事件监听 |
| 配置灵活性 | 需手动实现较多功能 | 提供更丰富的配置选项 |
重试机制实现:
async function withRetry(fn, retries = 3) {for (let i = 0; i < retries; i++) {try {return await fn();} catch (error) {if (i === retries - 1) throw error;await new Promise(res => setTimeout(res, 1000 * (i + 1)));}}}
背压控制:当UI渲染速度跟不上数据流时,实现缓冲机制:
class StreamBuffer {constructor() {this.queue = [];this.isProcessing = false;}async enqueue(data) {this.queue.push(data);if (!this.isProcessing) {this.isProcessing = true;await this.processQueue();}}async processQueue() {while (this.queue.length > 0) {const data = this.queue.shift();// 渲染逻辑await new Promise(res => setTimeout(res, 16)); // 模拟60fps渲染}this.isProcessing = false;}}
安全建议:
数据分块不完整:
\n\n作为消息分隔符(SSE格式)连接中断处理:
async function resilientStream(prompt) {let retryCount = 0;while (retryCount < 3) {try {const controller = new AbortController();const timeoutId = setTimeout(() => controller.abort(), 10000);const stream = await fetchStream(prompt, {signal: controller.signal});clearTimeout(timeoutId);await processStream(stream);return;} catch (error) {retryCount++;if (retryCount === 3) throw error;await new Promise(res => setTimeout(res, 1000 * retryCount));}}}
性能优化:
TextDecoder的流式解码
class DeepSeekStreamParser {constructor(onMessage, onComplete, onError) {this.onMessage = onMessage;this.onComplete = onComplete;this.onError = onError;this.buffer = '';}write(chunk) {this.buffer += chunk;this.processBuffer();}processBuffer() {const messages = this.buffer.split('\n\n');messages.slice(0, -1).forEach(msg => {if (msg.trim() && !msg.startsWith('data: [DONE]')) {try {const data = JSON.parse(msg.replace('data: ', ''));if (data.choices[0].delta?.content) {this.onMessage(data.choices[0].delta.content);}} catch (e) {// 忽略解析错误}}});this.buffer = messages[messages.length - 1] || '';}end() {if (this.buffer.trim()) this.processBuffer();this.onComplete();}}// 使用示例const parser = new DeepSeekStreamParser(content => console.log('New content:', content),() => console.log('Stream completed'),error => console.error('Parser error:', error));// 在fetch的processStream中替换解析逻辑
浏览器环境首选fetch:
Node环境按需选择:
关键考量因素:
通过合理选择和实现流式接口,前端应用可以显著提升与deepseek等AI服务的交互体验,实现真正的实时对话效果。建议开发者根据具体场景进行技术选型,并始终将错误处理和性能优化作为实现重点。