Next.js 15流式AI聊天:从SSE到React Server Actions的6种生产模式
AI聊天为什么总让人觉得卡
你打开一个AI聊天页面,输入问题,然后盯着空白对话框等了8秒——LLM终于吐出一大段文字,但用户早以为页面挂了。传统请求-响应模式下,用户感知延迟 = 网络延迟 + LLM首token时间 + 全量生成时间,体感就是"卡"。流式响应(Streaming)把等待拆成一个个token逐步推送,用户在0.5秒内看到第一个字,感知延迟降低80%。
Next.js 15在流式AI聊天场景下提供了从底层SSE到高层React Server Actions的完整工具链。本文将带你完成SSE流式响应→React Server Actions + AI→Vercel AI SDK集成→多模型路由与降级→对话状态与上下文管理→生产部署与性能优化的6种生产模式,从协议到上线,一步不落。
核心要点
- SSE是LLM流式输出的最佳传输协议,Next.js Route Handler原生支持
- React Server Actions让AI调用无需手写API端点,类型安全零样板
- Vercel AI SDK统一了OpenAI/Anthropic/Google等多家LLM的流式接口
- 多模型路由实现成本优化和容灾降级,GPT-4o挂了自动切Claude
- 对话状态管理需要区分短期上下文和长期记忆,避免token爆炸
- 生产部署要处理并发连接数、超时、背压和错误恢复
目录
- 流式AI聊天架构全景
- Pattern 1: SSE流式响应实现
- Pattern 2: React Server Actions + AI
- Pattern 3: Vercel AI SDK集成
- Pattern 4: 多模型路由与降级
- Pattern 5: 对话状态与上下文管理
- Pattern 6: 生产部署与性能优化
- 5个常见坑及解决方案
- 10个常见报错排查
- 进阶优化技巧
- 对比分析:SSE vs WebSocket vs 长轮询
- 在线工具推荐
- 总结
流式AI聊天架构全景
┌─────────────────────────────────────────────────────────────┐
│ Next.js 15 AI Chat 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ SSE/Stream ┌──────────────────────┐ │
│ │ Client │ ◄────────────── │ Route Handler / │ │
│ │ Chat UI │ │ Server Action │ │
│ │ │ ──────────────► │ │ │
│ └──────────┘ POST请求 └──────────┬───────────┘ │
│ │ │
│ ┌─────────────┼──────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ OpenAI │ │ Anthropic│ │ 本地 │ │
│ │ GPT-4o │ │ Claude │ │ Ollama│ │
│ └──────────┘ └──────────┘ └──────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 对话状态层 (Conversation State) │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Context │ │ History │ │ Long-term Memory │ │ │
│ │ │ Window │ │ Store │ │ (Vector DB) │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
技术选型决策树
需要流式AI聊天?
├── 快速原型 → Vercel AI SDK (Pattern 3)
├── 完全控制 → SSE + Route Handler (Pattern 1)
├── 类型安全优先 → React Server Actions (Pattern 2)
├── 多模型需求 → 多模型路由 (Pattern 4)
└── 企业级生产 → 全部组合 + 状态管理 (Pattern 5+6)
Pattern 1: SSE流式响应实现
SSE(Server-Sent Events)是LLM流式输出的标准传输协议。Next.js 15的Route Handler原生支持ReadableStream,可以直接返回SSE格式的流式响应。
基础SSE Route Handler
// app/api/chat/sse/route.ts
import { NextRequest } from 'next/server';
export const runtime = 'edge';
export const maxDuration = 60;
interface ChatMessage {
role: 'system' | 'user' | 'assistant';
content: string;
}
export async function POST(request: NextRequest) {
const { messages }: { messages: ChatMessage[] } = await request.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-4o',
messages,
stream: true,
}),
});
if (!response.ok) {
const errorData = await response.text();
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: errorData })}\n\n`)
);
controller.close();
return;
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n').filter((line) => line.startsWith('data: '));
for (const line of lines) {
const data = line.slice(6);
if (data === '[DONE]') {
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
continue;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ content })}\n\n`)
);
}
} catch {
// skip malformed chunks
}
}
}
controller.close();
} catch (error) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
);
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}
客户端SSE消费
// components/ChatSSE.tsx
'use client';
import { useState, useRef, useCallback } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}
export default function ChatSSE() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const abortControllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async () => {
if (!input.trim() || isStreaming) return;
const userMessage: Message = {
id: crypto.randomUUID(),
role: 'user',
content: input.trim(),
};
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: 'assistant',
content: '',
};
setMessages((prev) => [...prev, userMessage, assistantMessage]);
setInput('');
setIsStreaming(true);
const abortController = new AbortController();
abortControllerRef.current = abortController;
try {
const response = await fetch('/api/chat/sse', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage].map((m) => ({
role: m.role,
content: m.content,
})),
}),
signal: abortController.signal,
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n').filter((line) => line.startsWith('data: '));
for (const line of lines) {
const data = line.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
if (parsed.error) {
console.error('SSE error:', parsed.error);
continue;
}
if (parsed.content) {
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMessage.id
? { ...m, content: m.content + parsed.content }
: m
)
);
}
} catch {
// skip malformed data
}
}
}
} catch (error) {
if ((error as Error).name !== 'AbortError') {
console.error('Stream error:', error);
}
} finally {
setIsStreaming(false);
abortControllerRef.current = null;
}
}, [input, messages, isStreaming]);
const stopStreaming = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
return (
<div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg) => (
<div
key={msg.id}
className={`p-3 rounded-lg ${
msg.role === 'user'
? 'bg-blue-600 text-white ml-auto max-w-[80%]'
: 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
}`}
>
<pre className="whitespace-pre-wrap font-sans text-sm">{msg.content}</pre>
</div>
))}
</div>
<div className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && !e.shiftKey && sendMessage()}
placeholder="输入消息..."
className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
disabled={isStreaming}
/>
<button
onClick={isStreaming ? stopStreaming : sendMessage}
className={`px-4 py-2 rounded-lg font-medium ${
isStreaming
? 'bg-red-500 hover:bg-red-600 text-white'
: 'bg-blue-600 hover:bg-blue-700 text-white'
}`}
>
{isStreaming ? '停止' : '发送'}
</button>
</div>
</div>
);
}
Pattern 2: React Server Actions + AI
React Server Actions让AI调用无需手写API端点,直接在Server Component中定义异步函数,客户端通过useActionState调用,类型安全零样板。
Server Action定义
// app/actions/chat-action.ts
'use server';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
interface ChatState {
messages: Array<{ role: 'user' | 'assistant'; content: string }>;
error?: string;
}
export async function chatAction(
prevState: ChatState,
formData: FormData
): Promise<ChatState> {
const userInput = formData.get('message') as string;
if (!userInput?.trim()) {
return { ...prevState, error: '消息不能为空' };
}
const newMessages = [
...prevState.messages,
{ role: 'user' as const, content: userInput.trim() },
];
try {
const result = await streamText({
model: openai('gpt-4o'),
messages: newMessages,
});
const text = await result.text;
return {
messages: [
...newMessages,
{ role: 'assistant' as const, content: text },
],
};
} catch (error) {
return {
...prevState,
messages: newMessages,
error: `AI调用失败: ${String(error)}`,
};
}
}
流式Server Action
// app/actions/streaming-chat-action.ts
'use server';
import { createStreamableValue } from 'ai/rsc';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
export async function streamingChatAction(messages: Array<{ role: 'user' | 'assistant'; content: string }>) {
const streamableValue = createStreamableValue('');
(async () => {
try {
const result = await streamText({
model: openai('gpt-4o'),
messages,
});
for await (const chunk of result.textStream) {
streamableValue.update(chunk);
}
streamableValue.done();
} catch (error) {
streamableValue.error(String(error));
}
})();
return streamableValue.value;
}
客户端消费流式Server Action
// components/ChatServerAction.tsx
'use client';
import { useActionState } from 'react';
import { readStreamableValue } from 'ai/rsc';
import { streamingChatAction } from '@/app/actions/streaming-chat-action';
import { useState, useRef, useCallback } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}
export default function ChatServerAction() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const handleSubmit = useCallback(async () => {
if (!input.trim() || isStreaming) return;
const userMessage: Message = {
id: crypto.randomUUID(),
role: 'user',
content: input.trim(),
};
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: 'assistant',
content: '',
};
const updatedMessages = [...messages, userMessage];
setMessages([...updatedMessages, assistantMessage]);
setInput('');
setIsStreaming(true);
try {
const streamValue = await streamingChatAction(
updatedMessages.map((m) => ({ role: m.role, content: m.content }))
);
for await (const chunk of readStreamableValue(streamValue)) {
if (chunk) {
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMessage.id
? { ...m, content: m.content + chunk }
: m
)
);
}
}
} catch (error) {
console.error('Server Action error:', error);
} finally {
setIsStreaming(false);
}
}, [input, messages, isStreaming]);
return (
<div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg) => (
<div
key={msg.id}
className={`p-3 rounded-lg ${
msg.role === 'user'
? 'bg-blue-600 text-white ml-auto max-w-[80%]'
: 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
}`}
>
<pre className="whitespace-pre-wrap font-sans text-sm">{msg.content}</pre>
</div>
))}
</div>
<div className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && !e.shiftKey && handleSubmit()}
placeholder="输入消息..."
className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
disabled={isStreaming}
/>
<button
onClick={handleSubmit}
disabled={isStreaming}
className="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg font-medium disabled:opacity-50"
>
{isStreaming ? '生成中...' : '发送'}
</button>
</div>
</div>
);
}
Pattern 3: Vercel AI SDK集成
Vercel AI SDK(ai包)是Next.js流式AI聊天的官方推荐方案,统一了多家LLM的流式接口,提供useChat等开箱即用的Hook。
安装与配置
npm install ai @ai-sdk/openai @ai-sdk/anthropic @ai-sdk/google
Route Handler(AI SDK版)
// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
export const runtime = 'edge';
export const maxDuration = 60;
export async function POST(request: Request) {
const { messages } = await request.json();
const result = streamText({
model: openai('gpt-4o'),
system: '你是一个有帮助的AI助手,用简洁准确的方式回答问题。',
messages,
maxTokens: 4096,
temperature: 0.7,
});
return result.toDataStreamResponse();
}
useChat Hook(最简实现)
// components/ChatAISDK.tsx
'use client';
import { useChat } from '@ai-sdk/react';
export default function ChatAISDK() {
const { messages, input, handleInputChange, handleSubmit, isLoading, stop } =
useChat({
api: '/api/chat',
onError: (error) => {
console.error('Chat error:', error);
},
onFinish: (message) => {
console.log('Finished:', message.content.length, 'chars');
},
});
return (
<div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg) => (
<div
key={msg.id}
className={`p-3 rounded-lg ${
msg.role === 'user'
? 'bg-blue-600 text-white ml-auto max-w-[80%]'
: 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
}`}
>
<div className="whitespace-pre-wrap text-sm">{msg.content}</div>
</div>
))}
{isLoading && messages[messages.length - 1]?.role === 'user' && (
<div className="bg-gray-100 text-gray-900 mr-auto max-w-[80%] p-3 rounded-lg">
<div className="flex items-center gap-2 text-sm text-gray-500">
<div className="animate-pulse">●</div>
<div className="animate-pulse delay-75">●</div>
<div className="animate-pulse delay-150">●</div>
</div>
</div>
)}
</div>
<form onSubmit={handleSubmit} className="flex gap-2">
<input
value={input}
onChange={handleInputChange}
placeholder="输入消息..."
className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
/>
<button
type={isLoading ? 'button' : 'submit'}
onClick={isLoading ? stop : undefined}
className={`px-4 py-2 rounded-lg font-medium ${
isLoading
? 'bg-red-500 hover:bg-red-600 text-white'
: 'bg-blue-600 hover:bg-blue-700 text-white'
}`}
>
{isLoading ? '停止' : '发送'}
</button>
</form>
</div>
);
}
AI SDK高级配置
// app/api/chat/advanced/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText, createDataStreamResponse } from 'ai';
import { z } from 'zod';
export async function POST(request: Request) {
const { messages, conversationId } = await request.json();
const result = streamText({
model: openai('gpt-4o'),
system: '你是一个有帮助的AI助手。',
messages,
maxTokens: 4096,
temperature: 0.7,
tools: {
searchWeb: {
description: '搜索互联网获取最新信息',
parameters: z.object({
query: z.string().describe('搜索关键词'),
}),
execute: async ({ query }) => {
const res = await fetch(
`https://api.search.example.com/search?q=${encodeURIComponent(query)}`
);
return res.json();
},
},
},
onChunk: async ({ chunk }) => {
if (chunk.type === 'tool-call') {
console.log(`[Chat ${conversationId}] Tool call:`, chunk.toolName);
}
},
onFinish: async ({ response, usage }) => {
console.log(`[Chat ${conversationId}] Tokens:`, usage);
},
});
return result.toDataStreamResponse({
headers: {
'X-Conversation-Id': conversationId,
},
});
}
Pattern 4: 多模型路由与降级
生产环境中不能只依赖一个LLM供应商。多模型路由实现成本优化和容灾降级——GPT-4o挂了自动切Claude,简单问题用GPT-4o-mini省钱。
模型路由器
// lib/ai/model-router.ts
import { LanguageModelV1 } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { google } from '@ai-sdk/google';
type ModelTier = 'fast' | 'standard' | 'premium';
interface ModelConfig {
model: LanguageModelV1;
name: string;
tier: ModelTier;
maxRetries: number;
timeoutMs: number;
costPer1kTokens: number;
}
const MODEL_REGISTRY: Record<ModelTier, ModelConfig[]> = {
fast: [
{
model: openai('gpt-4o-mini'),
name: 'gpt-4o-mini',
tier: 'fast',
maxRetries: 2,
timeoutMs: 15000,
costPer1kTokens: 0.00015,
},
{
model: anthropic('claude-3-5-haiku-2024-10-22'),
name: 'claude-3.5-haiku',
tier: 'fast',
maxRetries: 2,
timeoutMs: 15000,
costPer1kTokens: 0.00025,
},
],
standard: [
{
model: openai('gpt-4o'),
name: 'gpt-4o',
tier: 'standard',
maxRetries: 2,
timeoutMs: 30000,
costPer1kTokens: 0.005,
},
{
model: anthropic('claude-sonnet-4-20250514'),
name: 'claude-sonnet-4',
tier: 'standard',
maxRetries: 2,
timeoutMs: 30000,
costPer1kTokens: 0.003,
},
],
premium: [
{
model: openai('o3'),
name: 'o3',
tier: 'premium',
maxRetries: 1,
timeoutMs: 60000,
costPer1kTokens: 0.03,
},
{
model: anthropic('claude-opus-4-20250514'),
name: 'claude-opus-4',
tier: 'premium',
maxRetries: 1,
timeoutMs: 60000,
costPer1kTokens: 0.015,
},
],
};
interface RouteDecision {
model: LanguageModelV1;
modelName: string;
tier: ModelTier;
fallbackChain: string[];
}
export function routeModel(
complexity: 'simple' | 'medium' | 'complex',
preferredProvider?: 'openai' | 'anthropic' | 'google'
): RouteDecision {
const tierMap: Record<string, ModelTier> = {
simple: 'fast',
medium: 'standard',
complex: 'premium',
};
const tier = tierMap[complexity];
const models = MODEL_REGISTRY[tier];
const preferred = preferredProvider
? models.find((m) => m.name.startsWith(preferredProvider))
: models[0];
const selected = preferred || models[0];
const fallbackChain = models
.filter((m) => m.name !== selected.name)
.map((m) => m.name);
return {
model: selected.model,
modelName: selected.name,
tier,
fallbackChain,
};
}
export { MODEL_REGISTRY };
export type { ModelConfig, ModelTier };
带降级的流式Route Handler
// app/api/chat/routed/route.ts
import { streamText } from 'ai';
import { routeModel, MODEL_REGISTRY } from '@/lib/ai/model-router';
import { NextRequest } from 'next/server';
export const runtime = 'edge';
export const maxDuration = 60;
interface ChatRequest {
messages: Array<{ role: string; content: string }>;
complexity?: 'simple' | 'medium' | 'complex';
provider?: 'openai' | 'anthropic';
}
export async function POST(request: NextRequest) {
const body: ChatRequest = await request.json();
const { messages, complexity = 'medium', provider } = body;
const route = routeModel(complexity, provider);
try {
const result = streamText({
model: route.model,
system: '你是一个有帮助的AI助手。',
messages,
maxTokens: 4096,
abortSignal: request.signal,
});
return result.toDataStreamResponse({
headers: {
'X-Model-Name': route.modelName,
'X-Model-Tier': route.tier,
'X-Fallback-Chain': route.fallbackChain.join(','),
},
});
} catch (error) {
// 降级到备选模型
const fallbackModelName = route.fallbackChain[0];
const allModels = Object.values(MODEL_REGISTRY).flat();
const fallback = allModels.find((m) => m.name === fallbackModelName);
if (!fallback) {
return new Response(
JSON.stringify({ error: '所有模型均不可用' }),
{ status: 503, headers: { 'Content-Type': 'application/json' } }
);
}
const result = streamText({
model: fallback.model,
system: '你是一个有帮助的AI助手。',
messages,
maxTokens: 4096,
});
return result.toDataStreamResponse({
headers: {
'X-Model-Name': fallback.name,
'X-Model-Tier': fallback.tier,
'X-Fallback-Used': 'true',
},
});
}
}
Pattern 5: 对话状态与上下文管理
AI聊天的核心挑战之一是上下文管理——对话历史越长,token消耗越大,成本指数增长。需要区分短期上下文窗口和长期记忆。
对话状态管理
// lib/ai/conversation-state.ts
import { Redis } from '@upstash/redis';
const redis = new Redis({
url: process.env.REDIS_URL!,
token: process.env.REDIS_TOKEN!,
});
interface ConversationMessage {
id: string;
role: 'system' | 'user' | 'assistant';
content: string;
timestamp: number;
tokenCount: number;
}
interface ConversationState {
id: string;
userId: string;
title: string;
messages: ConversationMessage[];
summary?: string;
totalTokens: number;
createdAt: number;
updatedAt: number;
}
const MAX_CONTEXT_TOKENS = 8000;
const SUMMARY_THRESHOLD = 6000;
function estimateTokenCount(text: string): number {
return Math.ceil(text.length / 3.5);
}
export async function getConversation(conversationId: string): Promise<ConversationState | null> {
const data = await redis.get<ConversationState>(
`conversation:${conversationId}`
);
return data;
}
export async function saveConversation(state: ConversationState): Promise<void> {
state.updatedAt = Date.now();
await redis.set(`conversation:${state.id}`, JSON.stringify(state), {
ex: 86400 * 30, // 30天过期
});
}
export async function createContextWindow(
conversationId: string
): Promise<ConversationMessage[]> {
const conversation = await getConversation(conversationId);
if (!conversation) return [];
const messages = conversation.messages;
// 如果总token数在阈值内,直接返回
const totalTokens = messages.reduce((sum, m) => sum + m.tokenCount, 0);
if (totalTokens <= MAX_CONTEXT_TOKENS) {
return messages;
}
// 如果有摘要,用摘要 + 最近消息
if (conversation.summary) {
const summaryMessage: ConversationMessage = {
id: 'summary',
role: 'system',
content: `以下是之前对话的摘要:\n${conversation.summary}`,
timestamp: Date.now(),
tokenCount: estimateTokenCount(conversation.summary),
};
// 从最新消息往前加,直到token数接近上限
const recentMessages: ConversationMessage[] = [];
let currentTokens = summaryMessage.tokenCount;
for (let i = messages.length - 1; i >= 0; i--) {
if (currentTokens + messages[i].tokenCount > MAX_CONTEXT_TOKENS) break;
recentMessages.unshift(messages[i]);
currentTokens += messages[i].tokenCount;
}
return [summaryMessage, ...recentMessages];
}
// 没有摘要,保留最近的消息
const result: ConversationMessage[] = [];
let currentTokens = 0;
for (let i = messages.length - 1; i >= 0; i--) {
if (currentTokens + messages[i].tokenCount > MAX_CONTEXT_TOKENS) break;
result.unshift(messages[i]);
currentTokens += messages[i].tokenCount;
}
return result;
}
export async function generateSummary(
conversationId: string,
messages: ConversationMessage[]
): Promise<string> {
const conversationText = messages
.map((m) => `${m.role}: ${m.content}`)
.join('\n');
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-4o-mini',
messages: [
{
role: 'system',
content: '请用2-3句话总结以下对话的关键信息和结论。',
},
{ role: 'user', content: conversationText },
],
max_tokens: 200,
}),
});
const data = await response.json();
const summary = data.choices?.[0]?.message?.content || '';
const conversation = await getConversation(conversationId);
if (conversation) {
conversation.summary = summary;
await saveConversation(conversation);
}
return summary;
}
export { estimateTokenCount, MAX_CONTEXT_TOKENS, SUMMARY_THRESHOLD };
export type { ConversationMessage, ConversationState };
对话管理Route Handler
// app/api/chat/managed/route.ts
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { NextRequest } from 'next/server';
import {
getConversation,
saveConversation,
createContextWindow,
generateSummary,
estimateTokenCount,
SUMMARY_THRESHOLD,
} from '@/lib/ai/conversation-state';
import type { ConversationMessage, ConversationState } from '@/lib/ai/conversation-state';
export const runtime = 'edge';
export const maxDuration = 60;
export async function POST(request: NextRequest) {
const { message, conversationId, userId } = await request.json();
let conversation = await getConversation(conversationId);
if (!conversation) {
conversation = {
id: conversationId,
userId,
title: message.slice(0, 30),
messages: [],
totalTokens: 0,
createdAt: Date.now(),
updatedAt: Date.now(),
};
}
const userMsg: ConversationMessage = {
id: crypto.randomUUID(),
role: 'user',
content: message,
timestamp: Date.now(),
tokenCount: estimateTokenCount(message),
};
conversation.messages.push(userMsg);
conversation.totalTokens += userMsg.tokenCount;
// 如果超过摘要阈值,异步生成摘要
if (conversation.totalTokens > SUMMARY_THRESHOLD && !conversation.summary) {
generateSummary(conversationId, conversation.messages.slice(0, -3)).catch(
console.error
);
}
const contextMessages = await createContextWindow(conversationId);
const result = streamText({
model: openai('gpt-4o'),
system: '你是一个有帮助的AI助手,用简洁准确的方式回答问题。',
messages: contextMessages.map((m) => ({
role: m.role as 'system' | 'user' | 'assistant',
content: m.content,
})),
maxTokens: 4096,
onFinish: async ({ text, usage }) => {
const assistantMsg: ConversationMessage = {
id: crypto.randomUUID(),
role: 'assistant',
content: text,
timestamp: Date.now(),
tokenCount: usage?.totalTokens || estimateTokenCount(text),
};
conversation!.messages.push(assistantMsg);
conversation!.totalTokens += assistantMsg.tokenCount;
await saveConversation(conversation!);
},
});
return result.toDataStreamResponse();
}
Pattern 6: 生产部署与性能优化
并发连接管理
// lib/ai/connection-pool.ts
import { Redis } from '@upstash/redis';
const redis = new Redis({
url: process.env.REDIS_URL!,
token: process.env.REDIS_TOKEN!,
});
const MAX_CONCURRENT_CONNECTIONS = 100;
const CONNECTION_TTL_SECONDS = 120;
export async function acquireConnection(userId: string): Promise<boolean> {
const key = `conn:${userId}`;
const current = await redis.incr(key);
if (current === 1) {
await redis.expire(key, CONNECTION_TTL_SECONDS);
}
if (current > MAX_CONCURRENT_CONNECTIONS) {
await redis.decr(key);
return false;
}
return true;
}
export async function releaseConnection(userId: string): Promise<void> {
const key = `conn:${userId}`;
const current = await redis.decr(key);
if (current <= 0) {
await redis.del(key);
}
}
export async function getConnectionCount(userId: string): Promise<number> {
const count = await redis.get<number>(`conn:${userId}`);
return count || 0;
}
限流中间件
// lib/ai/rate-limiter.ts
import { Redis } from '@upstash/redis';
import { NextRequest, NextResponse } from 'next/server';
const redis = new Redis({
url: process.env.REDIS_URL!,
token: process.env.REDIS_TOKEN!,
});
interface RateLimitConfig {
windowMs: number;
maxRequests: number;
}
const RATE_LIMITS: Record<string, RateLimitConfig> = {
free: { windowMs: 60000, maxRequests: 10 },
pro: { windowMs: 60000, maxRequests: 60 },
enterprise: { windowMs: 60000, maxRequests: 300 },
};
export async function rateLimitMiddleware(
request: NextRequest,
userId: string,
tier: string = 'free'
): Promise<NextResponse | null> {
const config = RATE_LIMITS[tier] || RATE_LIMITS.free;
const key = `rate:${userId}:${Math.floor(Date.now() / config.windowMs)}`;
const current = await redis.incr(key);
if (current === 1) {
await redis.expire(key, Math.ceil(config.windowMs / 1000));
}
if (current > config.maxRequests) {
return NextResponse.json(
{
error: '请求过于频繁,请稍后再试',
retryAfter: config.windowMs / 1000,
},
{
status: 429,
headers: {
'Retry-After': String(Math.ceil(config.windowMs / 1000)),
'X-RateLimit-Limit': String(config.maxRequests),
'X-RateLimit-Remaining': '0',
},
}
);
}
return null;
}
带完整中间件的Chat API
// app/api/chat/production/route.ts
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { NextRequest, NextResponse } from 'next/server';
import { acquireConnection, releaseConnection } from '@/lib/ai/connection-pool';
import { rateLimitMiddleware } from '@/lib/ai/rate-limiter';
import { routeModel } from '@/lib/ai/model-router';
export const runtime = 'edge';
export const maxDuration = 60;
export async function POST(request: NextRequest) {
const body = await request.json();
const { messages, userId = 'anonymous', complexity = 'medium' } = body;
// 1. 限流检查
const rateLimitResponse = await rateLimitMiddleware(request, userId);
if (rateLimitResponse) return rateLimitResponse;
// 2. 连接数检查
const acquired = await acquireConnection(userId);
if (!acquired) {
return NextResponse.json(
{ error: '并发连接数超限,请稍后再试' },
{ status: 429 }
);
}
try {
// 3. 模型路由
const route = routeModel(complexity);
// 4. 流式生成
const result = streamText({
model: route.model,
system: '你是一个有帮助的AI助手。',
messages,
maxTokens: 4096,
abortSignal: request.signal,
});
// 5. 包装响应,确保连接释放
const response = result.toDataStreamResponse({
headers: {
'X-Model-Name': route.modelName,
'X-Request-Id': crypto.randomUUID(),
},
});
// Clone response to ensure cleanup runs
const [body1, body2] = [response.body!, response.body!];
const finalResponse = new Response(body1, {
status: response.status,
headers: response.headers,
});
// 监听流结束释放连接
const reader = body2.getReader();
(async () => {
try {
while (true) {
const { done } = await reader.read();
if (done) break;
}
} finally {
await releaseConnection(userId);
}
})();
return finalResponse;
} catch (error) {
await releaseConnection(userId);
return NextResponse.json(
{ error: `AI服务异常: ${String(error)}` },
{ status: 500 }
);
}
}
健康检查与监控
// app/api/health/ai/route.ts
import { NextResponse } from 'next/server';
interface HealthCheck {
service: string;
status: 'healthy' | 'degraded' | 'down';
latencyMs: number;
error?: string;
}
async function checkOpenAI(): Promise<HealthCheck> {
const start = Date.now();
try {
const res = await fetch('https://api.openai.com/v1/models', {
headers: { Authorization: `Bearer ${process.env.OPENAI_API_KEY}` },
signal: AbortSignal.timeout(5000),
});
return {
service: 'openai',
status: res.ok ? 'healthy' : 'degraded',
latencyMs: Date.now() - start,
};
} catch (error) {
return {
service: 'openai',
status: 'down',
latencyMs: Date.now() - start,
error: String(error),
};
}
}
async function checkAnthropic(): Promise<HealthCheck> {
const start = Date.now();
try {
const res = await fetch('https://api.anthropic.com/v1/models', {
headers: {
'x-api-key': process.env.ANTHROPIC_API_KEY!,
'anthropic-version': '2023-06-01',
},
signal: AbortSignal.timeout(5000),
});
return {
service: 'anthropic',
status: res.ok ? 'healthy' : 'degraded',
latencyMs: Date.now() - start,
};
} catch (error) {
return {
service: 'anthropic',
status: 'down',
latencyMs: Date.now() - start,
error: String(error),
};
}
}
export async function GET() {
const checks = await Promise.all([checkOpenAI(), checkAnthropic()]);
const overallStatus = checks.every((c) => c.status === 'healthy')
? 'healthy'
: checks.some((c) => c.status === 'healthy')
? 'degraded'
: 'down';
return NextResponse.json({
status: overallStatus,
timestamp: new Date().toISOString(),
checks,
});
}
5个常见坑及解决方案
坑1:SSE连接在Nginx/CDN层被缓冲
现象:流式响应变成一次性返回,用户看到的是等了很久然后整段文字出现。
原因:Nginx默认开启proxy_buffering,CDN也会缓冲SSE响应。
解决:
# nginx.conf
location /api/chat {
proxy_pass http://nextjs_backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
proxy_read_timeout 300s;
}
// 在Route Handler中添加响应头
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no', // 告诉Nginx不要缓冲
Connection: 'keep-alive',
},
});
坑2:Edge Runtime下无法使用Node.js原生模块
现象:部署到Vercel Edge Functions时报错Module not found。
原因:Edge Runtime不支持Node.js的net、fs、child_process等模块。
解决:使用Edge兼容的替代库,如@upstash/redis替代ioredis,fetch替代http。
坑3:useChat的messages状态与外部状态不同步
现象:在useChat外维护了一份messages,但两者不同步。
原因:useChat内部管理自己的messages状态,外部修改不会反映到内部。
解决:使用useChat的onFinish回调同步状态,或使用setMessages方法。
const { messages, setMessages, ...rest } = useChat({
onFinish: (message) => {
// 同步到外部状态
externalStore.update(message);
},
});
// 从外部恢复状态
useEffect(() => {
setMessages(loadedMessages);
}, [loadedMessages]);
坑4:流式响应中断后无法恢复
现象:网络抖动导致SSE连接断开,已接收的内容丢失。
原因:SSE没有内置的断点续传机制(与gRPC不同)。
解决:客户端缓存已接收内容,重连时将已有内容作为上下文重新请求。
// 客户端断线恢复
async function recoverStream(
conversationId: string,
lastReceivedContent: string
) {
const response = await fetch('/api/chat/recover', {
method: 'POST',
body: JSON.stringify({
conversationId,
lastContent: lastReceivedContent,
}),
});
return response.body;
}
坑5:大量并发SSE连接导致内存泄漏
现象:服务器内存持续增长,最终OOM。
原因:未正确关闭ReadableStream和AbortController。
解决:确保所有流都有超时和清理机制。
const stream = new ReadableStream({
start(controller) {
const timeout = setTimeout(() => {
controller.close();
}, 60000); // 60秒超时
// ... 流式逻辑
return () => clearTimeout(timeout);
},
});
10个常见报错排查
| # | 报错信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | TypeError: response.body is null |
Route Handler未返回流式响应 | 确认返回ReadableStream或使用toDataStreamResponse() |
| 2 | AI_APICallError: 429 Too Many Requests |
LLM API限流 | 实现指数退避重试,或切换到备选模型 |
| 3 | AI_APICallError: context_length_exceeded |
对话历史超过模型上下文窗口 | 实现上下文窗口裁剪或摘要压缩 |
| 4 | Error: Invalid SSE data |
SSE数据格式不正确 | 检查data:前缀和\n\n分隔符 |
| 5 | AbortError: The operation was aborted |
用户取消或请求超时 | 正确处理AbortSignal,清理资源 |
| 6 | Error: Cannot read properties of undefined (reading 'delta') |
LLM返回了非标准格式的chunk | 添加防御性解析,跳过格式异常的chunk |
| 7 | RuntimeError: Edge Runtime does not support Node.js API |
在Edge Runtime中使用了Node.js API | 切换到Node.js Runtime或使用Edge兼容库 |
| 8 | Error: Maximum call stack size exceeded |
递归处理流式数据导致栈溢出 | 使用迭代替代递归处理chunk |
| 9 | TypeError: Failed to execute 'fetch' on 'Window' |
浏览器CORS限制 | 确保API路由和前端同源,或配置CORS头 |
| 10 | Error: Stream ended unexpectedly |
服务端流提前关闭 | 添加心跳机制检测连接状态,实现自动重连 |
进阶优化技巧
1. 流式Markdown渲染
// components/StreamMarkdown.tsx
'use client';
import { memo, useMemo } from 'react';
interface StreamMarkdownProps {
content: string;
isStreaming: boolean;
}
const StreamMarkdown = memo(function StreamMarkdown({
content,
isStreaming,
}: StreamMarkdownProps) {
const html = useMemo(() => {
// 简单的Markdown到HTML转换
return content
.replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code class="language-$1">$2</code></pre>')
.replace(/`([^`]+)`/g, '<code>$1</code>')
.replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>')
.replace(/\*([^*]+)\*/g, '<em>$1</em>')
.replace(/^### (.+)$/gm, '<h3>$1</h3>')
.replace(/^## (.+)$/gm, '<h2>$1</h2>')
.replace(/^# (.+)$/gm, '<h1>$1</h1>')
.replace(/\n/g, '<br/>');
}, [content]);
return (
<div className="prose prose-sm max-w-none">
<div dangerouslySetInnerHTML={{ __html: html }} />
{isStreaming && (
<span className="inline-block w-2 h-4 bg-blue-600 animate-pulse ml-0.5" />
)}
</div>
);
});
export default StreamMarkdown;
2. 预测性预取
// lib/ai/prefetch.ts
export function predictNextQuery(messages: Array<{ role: string; content: string }>): string | null {
const lastMessage = messages[messages.length - 1];
if (!lastMessage || lastMessage.role !== 'assistant') return null;
// 如果AI回复中包含代码,预测用户可能想运行
if (lastMessage.content.includes('```')) {
return '请帮我运行这段代码';
}
// 如果AI回复中包含列表,预测用户可能想展开
if (lastMessage.content.includes('1.') || lastMessage.content.includes('- ')) {
return '请详细解释第一点';
}
return null;
}
3. 流式响应缓存
// lib/ai/stream-cache.ts
import { Redis } from '@upstash/redis';
const redis = new Redis({
url: process.env.REDIS_URL!,
token: process.env.REDIS_TOKEN!,
});
export async function getCachedStream(queryHash: string): Promise<string | null> {
return redis.get<string>(`stream-cache:${queryHash}`);
}
export async function cacheStreamResponse(
queryHash: string,
response: string,
ttlSeconds: number = 3600
): Promise<void> {
await redis.set(`stream-cache:${queryHash}`, response, { ex: ttlSeconds });
}
export function computeQueryHash(
messages: Array<{ role: string; content: string }>,
model: string
): string {
const raw = JSON.stringify({ messages, model });
// 简单hash,生产环境用crypto.subtle.digest
let hash = 0;
for (let i = 0; i < raw.length; i++) {
const char = raw.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash |= 0;
}
return hash.toString(36);
}
对比分析:SSE vs WebSocket vs 长轮询
| 维度 | SSE | WebSocket | 长轮询 |
|---|---|---|---|
| 传输方向 | 单向(服务端→客户端) | 双向 | 单向 |
| 协议 | HTTP/1.1+ | ws/wss | HTTP |
| 自动重连 | 浏览器原生支持 | 需手动实现 | 每次都是新请求 |
| 代理/CDN | 友好(标准HTTP) | 可能被拦截 | 完全兼容 |
| 二进制数据 | 不支持 | 支持 | 不支持 |
| 帧开销 | 较高(文本格式) | 低(2字节) | 高(每次HTTP头) |
| 浏览器兼容 | IE不支持 | 广泛支持 | 广泛支持 |
| 连接数限制 | 6个/域名(HTTP/1.1) | 无限制 | 无实际限制 |
| LLM适用性 | ★★★★★ | ★★★ | ★★ |
| 实现复杂度 | ★★ | ★★★★ | ★ |
结论:LLM流式输出是典型的单向推送场景,SSE是最佳选择。只有在需要双向实时交互(如语音对话、实时协作编辑)时才考虑WebSocket。
在线工具推荐
相关文章
- Next.js流式SSR实战:从Suspense到渐进式渲染的5种生产模式 - 深入理解Next.js Streaming SSR原理
- Next.js App Router性能优化指南 - App Router性能优化全攻略
- Python SSE流式输出LLM - 后端SSE实现参考
外部资源
- Vercel AI SDK官方文档 - AI SDK完整API参考
- MDN: Server-Sent Events - SSE协议规范
总结
Next.js 15流式AI聊天的6种生产模式各有适用场景:
| 模式 | 适用场景 | 复杂度 | 推荐度 |
|---|---|---|---|
| SSE流式响应 | 需要完全控制流式协议 | ★★★ | ★★★★ |
| React Server Actions | 类型安全优先,快速开发 | ★★ | ★★★★ |
| Vercel AI SDK | 快速原型,多LLM支持 | ★ | ★★★★★ |
| 多模型路由 | 生产级容灾,成本优化 | ★★★★ | ★★★★ |
| 对话状态管理 | 长对话,上下文敏感 | ★★★★ | ★★★★ |
| 生产部署优化 | 企业级上线 | ★★★★★ | ★★★★★ |
核心建议:从Vercel AI SDK开始快速验证,逐步引入多模型路由和状态管理,最后完善生产部署。Next.js 15的流式AI聊天不再是技术难题,关键是选对模式、避开常见坑。
本站提供浏览器本地工具,免注册即可试用 →