Next.js 15流式AI聊天:从SSE到React Server Actions的6种生产模式

前端工程

AI聊天为什么总让人觉得卡

你打开一个AI聊天页面,输入问题,然后盯着空白对话框等了8秒——LLM终于吐出一大段文字,但用户早以为页面挂了。传统请求-响应模式下,用户感知延迟 = 网络延迟 + LLM首token时间 + 全量生成时间,体感就是"卡"。流式响应(Streaming)把等待拆成一个个token逐步推送,用户在0.5秒内看到第一个字,感知延迟降低80%。

Next.js 15在流式AI聊天场景下提供了从底层SSE到高层React Server Actions的完整工具链。本文将带你完成SSE流式响应→React Server Actions + AI→Vercel AI SDK集成→多模型路由与降级→对话状态与上下文管理→生产部署与性能优化的6种生产模式,从协议到上线,一步不落。


核心要点

  • SSE是LLM流式输出的最佳传输协议,Next.js Route Handler原生支持
  • React Server Actions让AI调用无需手写API端点,类型安全零样板
  • Vercel AI SDK统一了OpenAI/Anthropic/Google等多家LLM的流式接口
  • 多模型路由实现成本优化和容灾降级,GPT-4o挂了自动切Claude
  • 对话状态管理需要区分短期上下文和长期记忆,避免token爆炸
  • 生产部署要处理并发连接数、超时、背压和错误恢复

目录

  1. 流式AI聊天架构全景
  2. Pattern 1: SSE流式响应实现
  3. Pattern 2: React Server Actions + AI
  4. Pattern 3: Vercel AI SDK集成
  5. Pattern 4: 多模型路由与降级
  6. Pattern 5: 对话状态与上下文管理
  7. Pattern 6: 生产部署与性能优化
  8. 5个常见坑及解决方案
  9. 10个常见报错排查
  10. 进阶优化技巧
  11. 对比分析:SSE vs WebSocket vs 长轮询
  12. 在线工具推荐
  13. 总结

流式AI聊天架构全景

┌─────────────────────────────────────────────────────────────┐
│                    Next.js 15 AI Chat 架构                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    SSE/Stream    ┌──────────────────────┐    │
│  │  Client   │ ◄────────────── │  Route Handler /      │    │
│  │  Chat UI  │                 │  Server Action        │    │
│  │          │ ──────────────► │                        │    │
│  └──────────┘    POST请求      └──────────┬───────────┘    │
│                                            │               │
│                              ┌─────────────┼──────────┐    │
│                              │             │          │    │
│                              ▼             ▼          ▼    │
│                        ┌──────────┐ ┌──────────┐ ┌──────┐ │
│                        │ OpenAI   │ │ Anthropic│ │ 本地  │ │
│                        │ GPT-4o   │ │ Claude   │ │ Ollama│ │
│                        └──────────┘ └──────────┘ └──────┘ │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              对话状态层 (Conversation State)          │   │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐   │   │
│  │  │ Context │  │ History  │  │ Long-term Memory │   │   │
│  │  │ Window  │  │ Store    │  │ (Vector DB)      │   │   │
│  │  └─────────┘  └──────────┘  └──────────────────┘   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

技术选型决策树

需要流式AI聊天?
├── 快速原型 → Vercel AI SDK (Pattern 3)
├── 完全控制 → SSE + Route Handler (Pattern 1)
├── 类型安全优先 → React Server Actions (Pattern 2)
├── 多模型需求 → 多模型路由 (Pattern 4)
└── 企业级生产 → 全部组合 + 状态管理 (Pattern 5+6)

Pattern 1: SSE流式响应实现

SSE(Server-Sent Events)是LLM流式输出的标准传输协议。Next.js 15的Route Handler原生支持ReadableStream,可以直接返回SSE格式的流式响应。

基础SSE Route Handler

// app/api/chat/sse/route.ts
import { NextRequest } from 'next/server';

export const runtime = 'edge';
export const maxDuration = 60;

interface ChatMessage {
  role: 'system' | 'user' | 'assistant';
  content: string;
}

export async function POST(request: NextRequest) {
  const { messages }: { messages: ChatMessage[] } = await request.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      try {
        const response = await fetch('https://api.openai.com/v1/chat/completions', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
          },
          body: JSON.stringify({
            model: 'gpt-4o',
            messages,
            stream: true,
          }),
        });

        if (!response.ok) {
          const errorData = await response.text();
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({ error: errorData })}\n\n`)
          );
          controller.close();
          return;
        }

        const reader = response.body!.getReader();
        const decoder = new TextDecoder();

        while (true) {
          const { done, value } = await reader.read();
          if (done) break;

          const chunk = decoder.decode(value, { stream: true });
          const lines = chunk.split('\n').filter((line) => line.startsWith('data: '));

          for (const line of lines) {
            const data = line.slice(6);
            if (data === '[DONE]') {
              controller.enqueue(encoder.encode('data: [DONE]\n\n'));
              continue;
            }

            try {
              const parsed = JSON.parse(data);
              const content = parsed.choices?.[0]?.delta?.content;
              if (content) {
                controller.enqueue(
                  encoder.encode(`data: ${JSON.stringify({ content })}\n\n`)
                );
              }
            } catch {
              // skip malformed chunks
            }
          }
        }

        controller.close();
      } catch (error) {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
        );
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    },
  });
}

客户端SSE消费

// components/ChatSSE.tsx
'use client';

import { useState, useRef, useCallback } from 'react';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
}

export default function ChatSSE() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const abortControllerRef = useRef<AbortController | null>(null);

  const sendMessage = useCallback(async () => {
    if (!input.trim() || isStreaming) return;

    const userMessage: Message = {
      id: crypto.randomUUID(),
      role: 'user',
      content: input.trim(),
    };

    const assistantMessage: Message = {
      id: crypto.randomUUID(),
      role: 'assistant',
      content: '',
    };

    setMessages((prev) => [...prev, userMessage, assistantMessage]);
    setInput('');
    setIsStreaming(true);

    const abortController = new AbortController();
    abortControllerRef.current = abortController;

    try {
      const response = await fetch('/api/chat/sse', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: [...messages, userMessage].map((m) => ({
            role: m.role,
            content: m.content,
          })),
        }),
        signal: abortController.signal,
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value, { stream: true });
        const lines = chunk.split('\n').filter((line) => line.startsWith('data: '));

        for (const line of lines) {
          const data = line.slice(6);
          if (data === '[DONE]') continue;

          try {
            const parsed = JSON.parse(data);
            if (parsed.error) {
              console.error('SSE error:', parsed.error);
              continue;
            }
            if (parsed.content) {
              setMessages((prev) =>
                prev.map((m) =>
                  m.id === assistantMessage.id
                    ? { ...m, content: m.content + parsed.content }
                    : m
                )
              );
            }
          } catch {
            // skip malformed data
          }
        }
      }
    } catch (error) {
      if ((error as Error).name !== 'AbortError') {
        console.error('Stream error:', error);
      }
    } finally {
      setIsStreaming(false);
      abortControllerRef.current = null;
    }
  }, [input, messages, isStreaming]);

  const stopStreaming = useCallback(() => {
    abortControllerRef.current?.abort();
  }, []);

  return (
    <div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((msg) => (
          <div
            key={msg.id}
            className={`p-3 rounded-lg ${
              msg.role === 'user'
                ? 'bg-blue-600 text-white ml-auto max-w-[80%]'
                : 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
            }`}
          >
            <pre className="whitespace-pre-wrap font-sans text-sm">{msg.content}</pre>
          </div>
        ))}
      </div>

      <div className="flex gap-2">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyDown={(e) => e.key === 'Enter' && !e.shiftKey && sendMessage()}
          placeholder="输入消息..."
          className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
          disabled={isStreaming}
        />
        <button
          onClick={isStreaming ? stopStreaming : sendMessage}
          className={`px-4 py-2 rounded-lg font-medium ${
            isStreaming
              ? 'bg-red-500 hover:bg-red-600 text-white'
              : 'bg-blue-600 hover:bg-blue-700 text-white'
          }`}
        >
          {isStreaming ? '停止' : '发送'}
        </button>
      </div>
    </div>
  );
}

Pattern 2: React Server Actions + AI

React Server Actions让AI调用无需手写API端点,直接在Server Component中定义异步函数,客户端通过useActionState调用,类型安全零样板。

Server Action定义

// app/actions/chat-action.ts
'use server';

import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

interface ChatState {
  messages: Array<{ role: 'user' | 'assistant'; content: string }>;
  error?: string;
}

export async function chatAction(
  prevState: ChatState,
  formData: FormData
): Promise<ChatState> {
  const userInput = formData.get('message') as string;
  if (!userInput?.trim()) {
    return { ...prevState, error: '消息不能为空' };
  }

  const newMessages = [
    ...prevState.messages,
    { role: 'user' as const, content: userInput.trim() },
  ];

  try {
    const result = await streamText({
      model: openai('gpt-4o'),
      messages: newMessages,
    });

    const text = await result.text;

    return {
      messages: [
        ...newMessages,
        { role: 'assistant' as const, content: text },
      ],
    };
  } catch (error) {
    return {
      ...prevState,
      messages: newMessages,
      error: `AI调用失败: ${String(error)}`,
    };
  }
}

流式Server Action

// app/actions/streaming-chat-action.ts
'use server';

import { createStreamableValue } from 'ai/rsc';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

export async function streamingChatAction(messages: Array<{ role: 'user' | 'assistant'; content: string }>) {
  const streamableValue = createStreamableValue('');

  (async () => {
    try {
      const result = await streamText({
        model: openai('gpt-4o'),
        messages,
      });

      for await (const chunk of result.textStream) {
        streamableValue.update(chunk);
      }

      streamableValue.done();
    } catch (error) {
      streamableValue.error(String(error));
    }
  })();

  return streamableValue.value;
}

客户端消费流式Server Action

// components/ChatServerAction.tsx
'use client';

import { useActionState } from 'react';
import { readStreamableValue } from 'ai/rsc';
import { streamingChatAction } from '@/app/actions/streaming-chat-action';
import { useState, useRef, useCallback } from 'react';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
}

export default function ChatServerAction() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const handleSubmit = useCallback(async () => {
    if (!input.trim() || isStreaming) return;

    const userMessage: Message = {
      id: crypto.randomUUID(),
      role: 'user',
      content: input.trim(),
    };

    const assistantMessage: Message = {
      id: crypto.randomUUID(),
      role: 'assistant',
      content: '',
    };

    const updatedMessages = [...messages, userMessage];
    setMessages([...updatedMessages, assistantMessage]);
    setInput('');
    setIsStreaming(true);

    try {
      const streamValue = await streamingChatAction(
        updatedMessages.map((m) => ({ role: m.role, content: m.content }))
      );

      for await (const chunk of readStreamableValue(streamValue)) {
        if (chunk) {
          setMessages((prev) =>
            prev.map((m) =>
              m.id === assistantMessage.id
                ? { ...m, content: m.content + chunk }
                : m
            )
          );
        }
      }
    } catch (error) {
      console.error('Server Action error:', error);
    } finally {
      setIsStreaming(false);
    }
  }, [input, messages, isStreaming]);

  return (
    <div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((msg) => (
          <div
            key={msg.id}
            className={`p-3 rounded-lg ${
              msg.role === 'user'
                ? 'bg-blue-600 text-white ml-auto max-w-[80%]'
                : 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
            }`}
          >
            <pre className="whitespace-pre-wrap font-sans text-sm">{msg.content}</pre>
          </div>
        ))}
      </div>

      <div className="flex gap-2">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyDown={(e) => e.key === 'Enter' && !e.shiftKey && handleSubmit()}
          placeholder="输入消息..."
          className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
          disabled={isStreaming}
        />
        <button
          onClick={handleSubmit}
          disabled={isStreaming}
          className="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg font-medium disabled:opacity-50"
        >
          {isStreaming ? '生成中...' : '发送'}
        </button>
      </div>
    </div>
  );
}

Pattern 3: Vercel AI SDK集成

Vercel AI SDK(ai包)是Next.js流式AI聊天的官方推荐方案,统一了多家LLM的流式接口,提供useChat等开箱即用的Hook。

安装与配置

npm install ai @ai-sdk/openai @ai-sdk/anthropic @ai-sdk/google

Route Handler(AI SDK版)

// app/api/chat/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';

export const runtime = 'edge';
export const maxDuration = 60;

export async function POST(request: Request) {
  const { messages } = await request.json();

  const result = streamText({
    model: openai('gpt-4o'),
    system: '你是一个有帮助的AI助手,用简洁准确的方式回答问题。',
    messages,
    maxTokens: 4096,
    temperature: 0.7,
  });

  return result.toDataStreamResponse();
}

useChat Hook(最简实现)

// components/ChatAISDK.tsx
'use client';

import { useChat } from '@ai-sdk/react';

export default function ChatAISDK() {
  const { messages, input, handleInputChange, handleSubmit, isLoading, stop } =
    useChat({
      api: '/api/chat',
      onError: (error) => {
        console.error('Chat error:', error);
      },
      onFinish: (message) => {
        console.log('Finished:', message.content.length, 'chars');
      },
    });

  return (
    <div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((msg) => (
          <div
            key={msg.id}
            className={`p-3 rounded-lg ${
              msg.role === 'user'
                ? 'bg-blue-600 text-white ml-auto max-w-[80%]'
                : 'bg-gray-100 text-gray-900 mr-auto max-w-[80%]'
            }`}
          >
            <div className="whitespace-pre-wrap text-sm">{msg.content}</div>
          </div>
        ))}

        {isLoading && messages[messages.length - 1]?.role === 'user' && (
          <div className="bg-gray-100 text-gray-900 mr-auto max-w-[80%] p-3 rounded-lg">
            <div className="flex items-center gap-2 text-sm text-gray-500">
              <div className="animate-pulse">●</div>
              <div className="animate-pulse delay-75">●</div>
              <div className="animate-pulse delay-150">●</div>
            </div>
          </div>
        )}
      </div>

      <form onSubmit={handleSubmit} className="flex gap-2">
        <input
          value={input}
          onChange={handleInputChange}
          placeholder="输入消息..."
          className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
        />
        <button
          type={isLoading ? 'button' : 'submit'}
          onClick={isLoading ? stop : undefined}
          className={`px-4 py-2 rounded-lg font-medium ${
            isLoading
              ? 'bg-red-500 hover:bg-red-600 text-white'
              : 'bg-blue-600 hover:bg-blue-700 text-white'
          }`}
        >
          {isLoading ? '停止' : '发送'}
        </button>
      </form>
    </div>
  );
}

AI SDK高级配置

// app/api/chat/advanced/route.ts
import { openai } from '@ai-sdk/openai';
import { streamText, createDataStreamResponse } from 'ai';
import { z } from 'zod';

export async function POST(request: Request) {
  const { messages, conversationId } = await request.json();

  const result = streamText({
    model: openai('gpt-4o'),
    system: '你是一个有帮助的AI助手。',
    messages,
    maxTokens: 4096,
    temperature: 0.7,
    tools: {
      searchWeb: {
        description: '搜索互联网获取最新信息',
        parameters: z.object({
          query: z.string().describe('搜索关键词'),
        }),
        execute: async ({ query }) => {
          const res = await fetch(
            `https://api.search.example.com/search?q=${encodeURIComponent(query)}`
          );
          return res.json();
        },
      },
    },
    onChunk: async ({ chunk }) => {
      if (chunk.type === 'tool-call') {
        console.log(`[Chat ${conversationId}] Tool call:`, chunk.toolName);
      }
    },
    onFinish: async ({ response, usage }) => {
      console.log(`[Chat ${conversationId}] Tokens:`, usage);
    },
  });

  return result.toDataStreamResponse({
    headers: {
      'X-Conversation-Id': conversationId,
    },
  });
}

Pattern 4: 多模型路由与降级

生产环境中不能只依赖一个LLM供应商。多模型路由实现成本优化和容灾降级——GPT-4o挂了自动切Claude,简单问题用GPT-4o-mini省钱。

模型路由器

// lib/ai/model-router.ts
import { LanguageModelV1 } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { google } from '@ai-sdk/google';

type ModelTier = 'fast' | 'standard' | 'premium';

interface ModelConfig {
  model: LanguageModelV1;
  name: string;
  tier: ModelTier;
  maxRetries: number;
  timeoutMs: number;
  costPer1kTokens: number;
}

const MODEL_REGISTRY: Record<ModelTier, ModelConfig[]> = {
  fast: [
    {
      model: openai('gpt-4o-mini'),
      name: 'gpt-4o-mini',
      tier: 'fast',
      maxRetries: 2,
      timeoutMs: 15000,
      costPer1kTokens: 0.00015,
    },
    {
      model: anthropic('claude-3-5-haiku-2024-10-22'),
      name: 'claude-3.5-haiku',
      tier: 'fast',
      maxRetries: 2,
      timeoutMs: 15000,
      costPer1kTokens: 0.00025,
    },
  ],
  standard: [
    {
      model: openai('gpt-4o'),
      name: 'gpt-4o',
      tier: 'standard',
      maxRetries: 2,
      timeoutMs: 30000,
      costPer1kTokens: 0.005,
    },
    {
      model: anthropic('claude-sonnet-4-20250514'),
      name: 'claude-sonnet-4',
      tier: 'standard',
      maxRetries: 2,
      timeoutMs: 30000,
      costPer1kTokens: 0.003,
    },
  ],
  premium: [
    {
      model: openai('o3'),
      name: 'o3',
      tier: 'premium',
      maxRetries: 1,
      timeoutMs: 60000,
      costPer1kTokens: 0.03,
    },
    {
      model: anthropic('claude-opus-4-20250514'),
      name: 'claude-opus-4',
      tier: 'premium',
      maxRetries: 1,
      timeoutMs: 60000,
      costPer1kTokens: 0.015,
    },
  ],
};

interface RouteDecision {
  model: LanguageModelV1;
  modelName: string;
  tier: ModelTier;
  fallbackChain: string[];
}

export function routeModel(
  complexity: 'simple' | 'medium' | 'complex',
  preferredProvider?: 'openai' | 'anthropic' | 'google'
): RouteDecision {
  const tierMap: Record<string, ModelTier> = {
    simple: 'fast',
    medium: 'standard',
    complex: 'premium',
  };

  const tier = tierMap[complexity];
  const models = MODEL_REGISTRY[tier];

  const preferred = preferredProvider
    ? models.find((m) => m.name.startsWith(preferredProvider))
    : models[0];

  const selected = preferred || models[0];
  const fallbackChain = models
    .filter((m) => m.name !== selected.name)
    .map((m) => m.name);

  return {
    model: selected.model,
    modelName: selected.name,
    tier,
    fallbackChain,
  };
}

export { MODEL_REGISTRY };
export type { ModelConfig, ModelTier };

带降级的流式Route Handler

// app/api/chat/routed/route.ts
import { streamText } from 'ai';
import { routeModel, MODEL_REGISTRY } from '@/lib/ai/model-router';
import { NextRequest } from 'next/server';

export const runtime = 'edge';
export const maxDuration = 60;

interface ChatRequest {
  messages: Array<{ role: string; content: string }>;
  complexity?: 'simple' | 'medium' | 'complex';
  provider?: 'openai' | 'anthropic';
}

export async function POST(request: NextRequest) {
  const body: ChatRequest = await request.json();
  const { messages, complexity = 'medium', provider } = body;

  const route = routeModel(complexity, provider);

  try {
    const result = streamText({
      model: route.model,
      system: '你是一个有帮助的AI助手。',
      messages,
      maxTokens: 4096,
      abortSignal: request.signal,
    });

    return result.toDataStreamResponse({
      headers: {
        'X-Model-Name': route.modelName,
        'X-Model-Tier': route.tier,
        'X-Fallback-Chain': route.fallbackChain.join(','),
      },
    });
  } catch (error) {
    // 降级到备选模型
    const fallbackModelName = route.fallbackChain[0];
    const allModels = Object.values(MODEL_REGISTRY).flat();
    const fallback = allModels.find((m) => m.name === fallbackModelName);

    if (!fallback) {
      return new Response(
        JSON.stringify({ error: '所有模型均不可用' }),
        { status: 503, headers: { 'Content-Type': 'application/json' } }
      );
    }

    const result = streamText({
      model: fallback.model,
      system: '你是一个有帮助的AI助手。',
      messages,
      maxTokens: 4096,
    });

    return result.toDataStreamResponse({
      headers: {
        'X-Model-Name': fallback.name,
        'X-Model-Tier': fallback.tier,
        'X-Fallback-Used': 'true',
      },
    });
  }
}

Pattern 5: 对话状态与上下文管理

AI聊天的核心挑战之一是上下文管理——对话历史越长,token消耗越大,成本指数增长。需要区分短期上下文窗口和长期记忆。

对话状态管理

// lib/ai/conversation-state.ts
import { Redis } from '@upstash/redis';

const redis = new Redis({
  url: process.env.REDIS_URL!,
  token: process.env.REDIS_TOKEN!,
});

interface ConversationMessage {
  id: string;
  role: 'system' | 'user' | 'assistant';
  content: string;
  timestamp: number;
  tokenCount: number;
}

interface ConversationState {
  id: string;
  userId: string;
  title: string;
  messages: ConversationMessage[];
  summary?: string;
  totalTokens: number;
  createdAt: number;
  updatedAt: number;
}

const MAX_CONTEXT_TOKENS = 8000;
const SUMMARY_THRESHOLD = 6000;

function estimateTokenCount(text: string): number {
  return Math.ceil(text.length / 3.5);
}

export async function getConversation(conversationId: string): Promise<ConversationState | null> {
  const data = await redis.get<ConversationState>(
    `conversation:${conversationId}`
  );
  return data;
}

export async function saveConversation(state: ConversationState): Promise<void> {
  state.updatedAt = Date.now();
  await redis.set(`conversation:${state.id}`, JSON.stringify(state), {
    ex: 86400 * 30, // 30天过期
  });
}

export async function createContextWindow(
  conversationId: string
): Promise<ConversationMessage[]> {
  const conversation = await getConversation(conversationId);
  if (!conversation) return [];

  const messages = conversation.messages;

  // 如果总token数在阈值内,直接返回
  const totalTokens = messages.reduce((sum, m) => sum + m.tokenCount, 0);
  if (totalTokens <= MAX_CONTEXT_TOKENS) {
    return messages;
  }

  // 如果有摘要,用摘要 + 最近消息
  if (conversation.summary) {
    const summaryMessage: ConversationMessage = {
      id: 'summary',
      role: 'system',
      content: `以下是之前对话的摘要:\n${conversation.summary}`,
      timestamp: Date.now(),
      tokenCount: estimateTokenCount(conversation.summary),
    };

    // 从最新消息往前加,直到token数接近上限
    const recentMessages: ConversationMessage[] = [];
    let currentTokens = summaryMessage.tokenCount;

    for (let i = messages.length - 1; i >= 0; i--) {
      if (currentTokens + messages[i].tokenCount > MAX_CONTEXT_TOKENS) break;
      recentMessages.unshift(messages[i]);
      currentTokens += messages[i].tokenCount;
    }

    return [summaryMessage, ...recentMessages];
  }

  // 没有摘要,保留最近的消息
  const result: ConversationMessage[] = [];
  let currentTokens = 0;

  for (let i = messages.length - 1; i >= 0; i--) {
    if (currentTokens + messages[i].tokenCount > MAX_CONTEXT_TOKENS) break;
    result.unshift(messages[i]);
    currentTokens += messages[i].tokenCount;
  }

  return result;
}

export async function generateSummary(
  conversationId: string,
  messages: ConversationMessage[]
): Promise<string> {
  const conversationText = messages
    .map((m) => `${m.role}: ${m.content}`)
    .join('\n');

  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
    },
    body: JSON.stringify({
      model: 'gpt-4o-mini',
      messages: [
        {
          role: 'system',
          content: '请用2-3句话总结以下对话的关键信息和结论。',
        },
        { role: 'user', content: conversationText },
      ],
      max_tokens: 200,
    }),
  });

  const data = await response.json();
  const summary = data.choices?.[0]?.message?.content || '';

  const conversation = await getConversation(conversationId);
  if (conversation) {
    conversation.summary = summary;
    await saveConversation(conversation);
  }

  return summary;
}

export { estimateTokenCount, MAX_CONTEXT_TOKENS, SUMMARY_THRESHOLD };
export type { ConversationMessage, ConversationState };

对话管理Route Handler

// app/api/chat/managed/route.ts
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { NextRequest } from 'next/server';
import {
  getConversation,
  saveConversation,
  createContextWindow,
  generateSummary,
  estimateTokenCount,
  SUMMARY_THRESHOLD,
} from '@/lib/ai/conversation-state';
import type { ConversationMessage, ConversationState } from '@/lib/ai/conversation-state';

export const runtime = 'edge';
export const maxDuration = 60;

export async function POST(request: NextRequest) {
  const { message, conversationId, userId } = await request.json();

  let conversation = await getConversation(conversationId);

  if (!conversation) {
    conversation = {
      id: conversationId,
      userId,
      title: message.slice(0, 30),
      messages: [],
      totalTokens: 0,
      createdAt: Date.now(),
      updatedAt: Date.now(),
    };
  }

  const userMsg: ConversationMessage = {
    id: crypto.randomUUID(),
    role: 'user',
    content: message,
    timestamp: Date.now(),
    tokenCount: estimateTokenCount(message),
  };

  conversation.messages.push(userMsg);
  conversation.totalTokens += userMsg.tokenCount;

  // 如果超过摘要阈值,异步生成摘要
  if (conversation.totalTokens > SUMMARY_THRESHOLD && !conversation.summary) {
    generateSummary(conversationId, conversation.messages.slice(0, -3)).catch(
      console.error
    );
  }

  const contextMessages = await createContextWindow(conversationId);

  const result = streamText({
    model: openai('gpt-4o'),
    system: '你是一个有帮助的AI助手,用简洁准确的方式回答问题。',
    messages: contextMessages.map((m) => ({
      role: m.role as 'system' | 'user' | 'assistant',
      content: m.content,
    })),
    maxTokens: 4096,
    onFinish: async ({ text, usage }) => {
      const assistantMsg: ConversationMessage = {
        id: crypto.randomUUID(),
        role: 'assistant',
        content: text,
        timestamp: Date.now(),
        tokenCount: usage?.totalTokens || estimateTokenCount(text),
      };

      conversation!.messages.push(assistantMsg);
      conversation!.totalTokens += assistantMsg.tokenCount;
      await saveConversation(conversation!);
    },
  });

  return result.toDataStreamResponse();
}

Pattern 6: 生产部署与性能优化

并发连接管理

// lib/ai/connection-pool.ts
import { Redis } from '@upstash/redis';

const redis = new Redis({
  url: process.env.REDIS_URL!,
  token: process.env.REDIS_TOKEN!,
});

const MAX_CONCURRENT_CONNECTIONS = 100;
const CONNECTION_TTL_SECONDS = 120;

export async function acquireConnection(userId: string): Promise<boolean> {
  const key = `conn:${userId}`;
  const current = await redis.incr(key);

  if (current === 1) {
    await redis.expire(key, CONNECTION_TTL_SECONDS);
  }

  if (current > MAX_CONCURRENT_CONNECTIONS) {
    await redis.decr(key);
    return false;
  }

  return true;
}

export async function releaseConnection(userId: string): Promise<void> {
  const key = `conn:${userId}`;
  const current = await redis.decr(key);
  if (current <= 0) {
    await redis.del(key);
  }
}

export async function getConnectionCount(userId: string): Promise<number> {
  const count = await redis.get<number>(`conn:${userId}`);
  return count || 0;
}

限流中间件

// lib/ai/rate-limiter.ts
import { Redis } from '@upstash/redis';
import { NextRequest, NextResponse } from 'next/server';

const redis = new Redis({
  url: process.env.REDIS_URL!,
  token: process.env.REDIS_TOKEN!,
});

interface RateLimitConfig {
  windowMs: number;
  maxRequests: number;
}

const RATE_LIMITS: Record<string, RateLimitConfig> = {
  free: { windowMs: 60000, maxRequests: 10 },
  pro: { windowMs: 60000, maxRequests: 60 },
  enterprise: { windowMs: 60000, maxRequests: 300 },
};

export async function rateLimitMiddleware(
  request: NextRequest,
  userId: string,
  tier: string = 'free'
): Promise<NextResponse | null> {
  const config = RATE_LIMITS[tier] || RATE_LIMITS.free;
  const key = `rate:${userId}:${Math.floor(Date.now() / config.windowMs)}`;

  const current = await redis.incr(key);
  if (current === 1) {
    await redis.expire(key, Math.ceil(config.windowMs / 1000));
  }

  if (current > config.maxRequests) {
    return NextResponse.json(
      {
        error: '请求过于频繁,请稍后再试',
        retryAfter: config.windowMs / 1000,
      },
      {
        status: 429,
        headers: {
          'Retry-After': String(Math.ceil(config.windowMs / 1000)),
          'X-RateLimit-Limit': String(config.maxRequests),
          'X-RateLimit-Remaining': '0',
        },
      }
    );
  }

  return null;
}

带完整中间件的Chat API

// app/api/chat/production/route.ts
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { NextRequest, NextResponse } from 'next/server';
import { acquireConnection, releaseConnection } from '@/lib/ai/connection-pool';
import { rateLimitMiddleware } from '@/lib/ai/rate-limiter';
import { routeModel } from '@/lib/ai/model-router';

export const runtime = 'edge';
export const maxDuration = 60;

export async function POST(request: NextRequest) {
  const body = await request.json();
  const { messages, userId = 'anonymous', complexity = 'medium' } = body;

  // 1. 限流检查
  const rateLimitResponse = await rateLimitMiddleware(request, userId);
  if (rateLimitResponse) return rateLimitResponse;

  // 2. 连接数检查
  const acquired = await acquireConnection(userId);
  if (!acquired) {
    return NextResponse.json(
      { error: '并发连接数超限,请稍后再试' },
      { status: 429 }
    );
  }

  try {
    // 3. 模型路由
    const route = routeModel(complexity);

    // 4. 流式生成
    const result = streamText({
      model: route.model,
      system: '你是一个有帮助的AI助手。',
      messages,
      maxTokens: 4096,
      abortSignal: request.signal,
    });

    // 5. 包装响应,确保连接释放
    const response = result.toDataStreamResponse({
      headers: {
        'X-Model-Name': route.modelName,
        'X-Request-Id': crypto.randomUUID(),
      },
    });

    // Clone response to ensure cleanup runs
    const [body1, body2] = [response.body!, response.body!];
    const finalResponse = new Response(body1, {
      status: response.status,
      headers: response.headers,
    });

    // 监听流结束释放连接
    const reader = body2.getReader();
    (async () => {
      try {
        while (true) {
          const { done } = await reader.read();
          if (done) break;
        }
      } finally {
        await releaseConnection(userId);
      }
    })();

    return finalResponse;
  } catch (error) {
    await releaseConnection(userId);
    return NextResponse.json(
      { error: `AI服务异常: ${String(error)}` },
      { status: 500 }
    );
  }
}

健康检查与监控

// app/api/health/ai/route.ts
import { NextResponse } from 'next/server';

interface HealthCheck {
  service: string;
  status: 'healthy' | 'degraded' | 'down';
  latencyMs: number;
  error?: string;
}

async function checkOpenAI(): Promise<HealthCheck> {
  const start = Date.now();
  try {
    const res = await fetch('https://api.openai.com/v1/models', {
      headers: { Authorization: `Bearer ${process.env.OPENAI_API_KEY}` },
      signal: AbortSignal.timeout(5000),
    });
    return {
      service: 'openai',
      status: res.ok ? 'healthy' : 'degraded',
      latencyMs: Date.now() - start,
    };
  } catch (error) {
    return {
      service: 'openai',
      status: 'down',
      latencyMs: Date.now() - start,
      error: String(error),
    };
  }
}

async function checkAnthropic(): Promise<HealthCheck> {
  const start = Date.now();
  try {
    const res = await fetch('https://api.anthropic.com/v1/models', {
      headers: {
        'x-api-key': process.env.ANTHROPIC_API_KEY!,
        'anthropic-version': '2023-06-01',
      },
      signal: AbortSignal.timeout(5000),
    });
    return {
      service: 'anthropic',
      status: res.ok ? 'healthy' : 'degraded',
      latencyMs: Date.now() - start,
    };
  } catch (error) {
    return {
      service: 'anthropic',
      status: 'down',
      latencyMs: Date.now() - start,
      error: String(error),
    };
  }
}

export async function GET() {
  const checks = await Promise.all([checkOpenAI(), checkAnthropic()]);

  const overallStatus = checks.every((c) => c.status === 'healthy')
    ? 'healthy'
    : checks.some((c) => c.status === 'healthy')
    ? 'degraded'
    : 'down';

  return NextResponse.json({
    status: overallStatus,
    timestamp: new Date().toISOString(),
    checks,
  });
}

5个常见坑及解决方案

坑1:SSE连接在Nginx/CDN层被缓冲

现象:流式响应变成一次性返回,用户看到的是等了很久然后整段文字出现。

原因:Nginx默认开启proxy_buffering,CDN也会缓冲SSE响应。

解决

# nginx.conf
location /api/chat {
    proxy_pass http://nextjs_backend;
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
    proxy_read_timeout 300s;
}
// 在Route Handler中添加响应头
return new Response(stream, {
  headers: {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache, no-transform',
    'X-Accel-Buffering': 'no', // 告诉Nginx不要缓冲
    Connection: 'keep-alive',
  },
});

坑2:Edge Runtime下无法使用Node.js原生模块

现象:部署到Vercel Edge Functions时报错Module not found

原因:Edge Runtime不支持Node.js的netfschild_process等模块。

解决:使用Edge兼容的替代库,如@upstash/redis替代ioredisfetch替代http

坑3:useChat的messages状态与外部状态不同步

现象:在useChat外维护了一份messages,但两者不同步。

原因useChat内部管理自己的messages状态,外部修改不会反映到内部。

解决:使用useChatonFinish回调同步状态,或使用setMessages方法。

const { messages, setMessages, ...rest } = useChat({
  onFinish: (message) => {
    // 同步到外部状态
    externalStore.update(message);
  },
});

// 从外部恢复状态
useEffect(() => {
  setMessages(loadedMessages);
}, [loadedMessages]);

坑4:流式响应中断后无法恢复

现象:网络抖动导致SSE连接断开,已接收的内容丢失。

原因:SSE没有内置的断点续传机制(与gRPC不同)。

解决:客户端缓存已接收内容,重连时将已有内容作为上下文重新请求。

// 客户端断线恢复
async function recoverStream(
  conversationId: string,
  lastReceivedContent: string
) {
  const response = await fetch('/api/chat/recover', {
    method: 'POST',
    body: JSON.stringify({
      conversationId,
      lastContent: lastReceivedContent,
    }),
  });

  return response.body;
}

坑5:大量并发SSE连接导致内存泄漏

现象:服务器内存持续增长,最终OOM。

原因:未正确关闭ReadableStreamAbortController

解决:确保所有流都有超时和清理机制。

const stream = new ReadableStream({
  start(controller) {
    const timeout = setTimeout(() => {
      controller.close();
    }, 60000); // 60秒超时

    // ... 流式逻辑

    return () => clearTimeout(timeout);
  },
});

10个常见报错排查

# 报错信息 原因 解决方案
1 TypeError: response.body is null Route Handler未返回流式响应 确认返回ReadableStream或使用toDataStreamResponse()
2 AI_APICallError: 429 Too Many Requests LLM API限流 实现指数退避重试,或切换到备选模型
3 AI_APICallError: context_length_exceeded 对话历史超过模型上下文窗口 实现上下文窗口裁剪或摘要压缩
4 Error: Invalid SSE data SSE数据格式不正确 检查data:前缀和\n\n分隔符
5 AbortError: The operation was aborted 用户取消或请求超时 正确处理AbortSignal,清理资源
6 Error: Cannot read properties of undefined (reading 'delta') LLM返回了非标准格式的chunk 添加防御性解析,跳过格式异常的chunk
7 RuntimeError: Edge Runtime does not support Node.js API 在Edge Runtime中使用了Node.js API 切换到Node.js Runtime或使用Edge兼容库
8 Error: Maximum call stack size exceeded 递归处理流式数据导致栈溢出 使用迭代替代递归处理chunk
9 TypeError: Failed to execute 'fetch' on 'Window' 浏览器CORS限制 确保API路由和前端同源,或配置CORS头
10 Error: Stream ended unexpectedly 服务端流提前关闭 添加心跳机制检测连接状态,实现自动重连

进阶优化技巧

1. 流式Markdown渲染

// components/StreamMarkdown.tsx
'use client';

import { memo, useMemo } from 'react';

interface StreamMarkdownProps {
  content: string;
  isStreaming: boolean;
}

const StreamMarkdown = memo(function StreamMarkdown({
  content,
  isStreaming,
}: StreamMarkdownProps) {
  const html = useMemo(() => {
    // 简单的Markdown到HTML转换
    return content
      .replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code class="language-$1">$2</code></pre>')
      .replace(/`([^`]+)`/g, '<code>$1</code>')
      .replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>')
      .replace(/\*([^*]+)\*/g, '<em>$1</em>')
      .replace(/^### (.+)$/gm, '<h3>$1</h3>')
      .replace(/^## (.+)$/gm, '<h2>$1</h2>')
      .replace(/^# (.+)$/gm, '<h1>$1</h1>')
      .replace(/\n/g, '<br/>');
  }, [content]);

  return (
    <div className="prose prose-sm max-w-none">
      <div dangerouslySetInnerHTML={{ __html: html }} />
      {isStreaming && (
        <span className="inline-block w-2 h-4 bg-blue-600 animate-pulse ml-0.5" />
      )}
    </div>
  );
});

export default StreamMarkdown;

2. 预测性预取

// lib/ai/prefetch.ts
export function predictNextQuery(messages: Array<{ role: string; content: string }>): string | null {
  const lastMessage = messages[messages.length - 1];
  if (!lastMessage || lastMessage.role !== 'assistant') return null;

  // 如果AI回复中包含代码,预测用户可能想运行
  if (lastMessage.content.includes('```')) {
    return '请帮我运行这段代码';
  }

  // 如果AI回复中包含列表,预测用户可能想展开
  if (lastMessage.content.includes('1.') || lastMessage.content.includes('- ')) {
    return '请详细解释第一点';
  }

  return null;
}

3. 流式响应缓存

// lib/ai/stream-cache.ts
import { Redis } from '@upstash/redis';

const redis = new Redis({
  url: process.env.REDIS_URL!,
  token: process.env.REDIS_TOKEN!,
});

export async function getCachedStream(queryHash: string): Promise<string | null> {
  return redis.get<string>(`stream-cache:${queryHash}`);
}

export async function cacheStreamResponse(
  queryHash: string,
  response: string,
  ttlSeconds: number = 3600
): Promise<void> {
  await redis.set(`stream-cache:${queryHash}`, response, { ex: ttlSeconds });
}

export function computeQueryHash(
  messages: Array<{ role: string; content: string }>,
  model: string
): string {
  const raw = JSON.stringify({ messages, model });
  // 简单hash,生产环境用crypto.subtle.digest
  let hash = 0;
  for (let i = 0; i < raw.length; i++) {
    const char = raw.charCodeAt(i);
    hash = ((hash << 5) - hash) + char;
    hash |= 0;
  }
  return hash.toString(36);
}

对比分析:SSE vs WebSocket vs 长轮询

维度 SSE WebSocket 长轮询
传输方向 单向(服务端→客户端) 双向 单向
协议 HTTP/1.1+ ws/wss HTTP
自动重连 浏览器原生支持 需手动实现 每次都是新请求
代理/CDN 友好(标准HTTP) 可能被拦截 完全兼容
二进制数据 不支持 支持 不支持
帧开销 较高(文本格式) 低(2字节) 高(每次HTTP头)
浏览器兼容 IE不支持 广泛支持 广泛支持
连接数限制 6个/域名(HTTP/1.1) 无限制 无实际限制
LLM适用性 ★★★★★ ★★★ ★★
实现复杂度 ★★ ★★★★

结论:LLM流式输出是典型的单向推送场景,SSE是最佳选择。只有在需要双向实时交互(如语音对话、实时协作编辑)时才考虑WebSocket。


在线工具推荐

相关文章

外部资源


总结

Next.js 15流式AI聊天的6种生产模式各有适用场景:

模式 适用场景 复杂度 推荐度
SSE流式响应 需要完全控制流式协议 ★★★ ★★★★
React Server Actions 类型安全优先,快速开发 ★★ ★★★★
Vercel AI SDK 快速原型,多LLM支持 ★★★★★
多模型路由 生产级容灾,成本优化 ★★★★ ★★★★
对话状态管理 长对话,上下文敏感 ★★★★ ★★★★
生产部署优化 企业级上线 ★★★★★ ★★★★★

核心建议:从Vercel AI SDK开始快速验证,逐步引入多模型路由和状态管理,最后完善生产部署。Next.js 15的流式AI聊天不再是技术难题,关键是选对模式、避开常见坑。

本站提供浏览器本地工具,免注册即可试用 →

#Next.js#AI聊天#流式响应#SSE#React Server Actions#LLM#2026#前端工程