Python MCP客户端集成实战:从协议解析到多工具编排的7种生产模式

AI与大数据

为什么你的AI应用总是无法连接外部工具?MCP客户端集成的5大痛点

你花了三天写了一个完美的MCP Server,结果Claude Desktop连不上?你的AI Agent调用三个工具,第一个超时后面全挂?工具发现返回了一堆schema,你不知道怎么转成LLM能用的function calling格式?SSE连接总是断开重连,数据丢了还不知道?多工具编排时串行太慢并行又乱套?

这些问题的根源在于:大多数开发者只关注了MCP Server的开发,却忽略了Client端的集成复杂度。MCP协议虽然定义了标准通信方式,但客户端需要处理JSON-RPC消息构造、传输层适配、工具发现与转换、编排策略、错误恢复等一整套工程问题。

核心要点

  • 掌握MCP协议JSON-RPC 2.0通信机制,实现可靠的客户端-服务器连接
  • 学会工具发现与动态注册,让AI Agent自动适配新工具
  • 理解多工具编排的7种生产模式,从串行调用到并行DAG执行
  • 建立完善的错误处理与重试机制,应对网络抖动和服务不可用
  • 掌握SSE流式响应处理,实现实时工具调用进度反馈

目录导航


MCP协议核心概念

在深入7种生产模式之前,我们需要理解MCP客户端在协议栈中的位置和核心概念。

MCP客户端架构总览

┌─────────────────────────────────────────────────┐
│                  AI Application                  │
│  (Claude Desktop / ChatGPT / 自定义Agent)        │
├─────────────────────────────────────────────────┤
│               MCP Client Layer                   │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ JSON-RPC  │ │Transport │ │ Tool Registry  │  │
│  │ Handler   │ │ Adapter  │ │ & Discovery    │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ Orchest-  │ │ Error &  │ │ Stream         │  │
│  │ ration    │ │ Retry    │ │ Handler        │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
├─────────────────────────────────────────────────┤
│              Transport Layer                     │
│     stdio / SSE / Streamable HTTP               │
├─────────────────────────────────────────────────┤
│              MCP Server                          │
│  Tools / Resources / Prompts                    │
└─────────────────────────────────────────────────┘

核心概念对照表

概念 说明 JSON-RPC方法 类比
Initialize 客户端与服务器握手 initialize TCP三次握手
Tool Discovery 获取服务器支持的工具列表 tools/list DNS查询
Tool Call 调用指定工具并获取结果 tools/call HTTP POST
Resource Read 读取服务器暴露的资源 resources/read HTTP GET
Prompt Get 获取预定义提示模板 prompts/get 模板引擎
Notification 单向通知,无需响应 无id字段 UDP广播
Cancel 取消正在进行的请求 notifications/cancelled HTTP取消

JSON-RPC 2.0消息格式

MCP协议基于JSON-RPC 2.0,理解消息格式是客户端开发的基础:

// 请求消息
{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "search_docs",
    "arguments": {"query": "MCP协议"}
  },
  "id": 1
}

// 成功响应
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {"type": "text", "text": "搜索结果..."}
    ]
  },
  "id": 1
}

// 错误响应
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32600,
    "message": "Invalid Request"
  },
  "id": 1
}

如果你对MCP Server开发感兴趣,可以参考我们之前的文章:Python MCP Server开发实战。如果你需要了解更底层的AI函数调用协议,推荐阅读 AI函数调用生产实践


Pattern 1: 基础JSON-RPC客户端

问题:你的AI应用需要连接MCP Server,但不知道如何构造正确的JSON-RPC消息、管理请求ID、处理响应。

解决方案:构建一个完整的JSON-RPC 2.0客户端,封装消息构造、发送、响应匹配和超时处理。

完整实现

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable


@dataclass
class JsonRpcRequest:
    jsonrpc: str = "2.0"
    method: str = ""
    params: dict[str, Any] = field(default_factory=dict)
    id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)


@dataclass
class JsonRpcResponse:
    id: int | str
    result: Any = None
    error: dict[str, Any] | None = None


class JsonRpcClient:
    def __init__(self, timeout: float = 30.0):
        self._request_id = 0
        self._pending: dict[int | str, asyncio.Future] = {}
        self._timeout = timeout
        self._notification_handlers: dict[str, Callable] = {}
        self._writer: asyncio.StreamWriter | None = None
        self._reader: asyncio.StreamReader | None = None

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
        import os
        proc_env = {**os.environ, **(env or {})}
        self._process = await asyncio.create_subprocess_exec(
            command,
            *(args or []),
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=proc_env,
        )
        self._reader = self._process.stdout
        self._writer = self._process.stdin
        asyncio.create_task(self._read_loop())

    async def _read_loop(self):
        while self._reader and not self._reader.at_eof():
            line = await self._reader.readline()
            if not line:
                continue
            try:
                message = json.loads(line.decode().strip())
                await self._handle_message(message)
            except json.JSONDecodeError:
                continue

    async def _handle_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(
                        McpError(message["error"]["code"], message["error"]["message"])
                    )
                else:
                    future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
        elif "method" in message:
            handler = self._notification_handlers.get(message["method"])
            if handler:
                await handler(message.get("params", {}))

    async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
        request_id = self._next_id()
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[request_id] = future

        data = json.dumps(request) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

        try:
            response = await asyncio.wait_for(future, timeout=self._timeout)
            return response.result
        except asyncio.TimeoutError:
            self._pending.pop(request_id, None)
            raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")

    async def send_notification(self, method: str, params: dict[str, Any] | None = None):
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        data = json.dumps(notification) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

    def on_notification(self, method: str, handler: Callable):
        self._notification_handlers[method] = handler

    async def close(self):
        if self._writer:
            self._writer.close()
        if hasattr(self, "_process") and self._process:
            self._process.terminate()
            await self._process.wait()


class McpError(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"MCP Error [{code}]: {message}")


class McpTimeoutError(Exception):
    pass

初始化握手

async def initialize_client(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {
            "roots": {"listChanged": True},
            "sampling": {},
        },
        "clientInfo": {
            "name": "toolsku-mcp-client",
            "version": "1.0.0",
        },
    })
    await client.send_notification("notifications/initialized")
    return result

使用示例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])

    server_info = await initialize_client(client)
    print(f"Connected to: {server_info['serverInfo']['name']}")

    tools = await client.send_request("tools/list")
    for tool in tools.get("tools", []):
        print(f"  - {tool['name']}: {tool['description']}")

    result = await client.send_request("tools/call", {
        "name": "search_docs",
        "arguments": {"query": "MCP协议解析"},
    })
    print(f"Result: {result}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 2: SSE传输层实现

问题:stdio传输仅适用于本地进程通信,生产环境需要通过网络连接远程MCP Server,SSE(Server-Sent Events)是MCP协议支持的远程传输方式。

解决方案:实现完整的SSE传输层,包括连接管理、事件解析、消息路由和自动重连。

SSE传输架构

┌──────────────┐     POST /messages     ┌──────────────┐
│  MCP Client  │ ──────────────────────> │  MCP Server  │
│              │                          │              │
│  SSE Handler │ <──── GET /sse ──────── │  SSE Endpoint│
│              │     EventStream          │              │
└──────────────┘                          └──────────────┘
     │                                          │
     │  1. GET /sse 建立SSE连接                  │
     │  2. 接收 endpoint 事件获取POST URL        │
     │  3. POST /messages 发送JSON-RPC请求       │
     │  4. 通过SSE接收JSON-RPC响应               │
     └──────────────────────────────────────────┘

完整实现

import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx

logger = logging.getLogger(__name__)


@dataclass
class SseEvent:
    event: str = "message"
    data: str = ""
    id: str | None = None
    retry: int | None = None


class SseTransport:
    def __init__(
        self,
        server_url: str,
        reconnect_interval: float = 5.0,
        max_reconnect_attempts: int = 10,
        headers: dict[str, str] | None = None,
    ):
        self._server_url = server_url.rstrip("/")
        self._reconnect_interval = reconnect_interval
        self._max_reconnect_attempts = max_reconnect_attempts
        self._headers = headers or {}
        self._message_endpoint: str | None = None
        self._session: httpx.AsyncClient | None = None
        self._running = False
        self._reconnect_count = 0
        self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
        self._last_event_id: str | None = None

    async def connect(self):
        self._session = httpx.AsyncClient(
            headers={**self._headers, "Accept": "text/event-stream"},
            timeout=httpx.Timeout(30.0, read=300.0),
        )
        self._running = True
        asyncio.create_task(self._sse_listener())

    async def _sse_listener(self):
        while self._running:
            try:
                sse_url = f"{self._server_url}/sse"
                params = {}
                if self._last_event_id:
                    params["Last-Event-ID"] = self._last_event_id

                async with self._session.stream("GET", sse_url, params=params) as response:
                    response.raise_for_status()
                    self._reconnect_count = 0

                    current_event = SseEvent()
                    async for line in response.aiter_lines():
                        if not self._running:
                            break

                        if line.startswith(":"):
                            continue

                        if line == "":
                            if current_event.data:
                                await self._dispatch_event(current_event)
                            current_event = SseEvent()
                            continue

                        if ":" in line:
                            field_name, _, field_value = line.partition(":")
                            field_value = field_value.lstrip(" ")

                            if field_name == "event":
                                current_event.event = field_value
                            elif field_name == "data":
                                if current_event.data:
                                    current_event.data += "\n"
                                current_event.data += field_value
                            elif field_name == "id":
                                current_event.id = field_value
                                self._last_event_id = field_value
                            elif field_name == "retry":
                                try:
                                    current_event.retry = int(field_value)
                                except ValueError:
                                    pass

            except httpx.HTTPStatusError as e:
                logger.error(f"SSE HTTP error: {e.response.status_code}")
            except httpx.RequestError as e:
                logger.error(f"SSE connection error: {e}")
            except Exception as e:
                logger.error(f"SSE unexpected error: {e}")

            if self._running:
                self._reconnect_count += 1
                if self._reconnect_count > self._max_reconnect_attempts:
                    logger.error("Max reconnect attempts reached, stopping")
                    self._running = False
                    break
                wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
                logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
                await asyncio.sleep(wait)

    async def _dispatch_event(self, event: SseEvent):
        if event.event == "endpoint":
            self._message_endpoint = f"{self._server_url}{event.data}"
            logger.info(f"Message endpoint: {self._message_endpoint}")

        handlers = self._event_handlers.get(event.event, [])
        for handler in handlers:
            try:
                if event.event == "endpoint":
                    await handler(event.data)
                else:
                    message = json.loads(event.data)
                    await handler(message)
            except Exception as e:
                logger.error(f"Event handler error: {e}")

    async def send_message(self, message: dict[str, Any]) -> None:
        if not self._message_endpoint:
            raise RuntimeError("SSE transport not initialized, endpoint not received")
        if not self._session:
            raise RuntimeError("SSE session not connected")

        response = await self._session.post(
            self._message_endpoint,
            json=message,
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()

    def on_event(self, event_type: str, handler: Callable):
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)

    async def close(self):
        self._running = False
        if self._session:
            await self._session.aclose()

集成使用

class McpSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._request_id = 0
        self._pending: dict[int, asyncio.Future] = {}
        self._transport.on_event("message", self._on_message)

    async def connect(self):
        await self._transport.connect()

    async def _on_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
                else:
                    future.set_result(message.get("result"))

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        self._request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": self._request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[self._request_id] = future
        await self._transport.send_message(request)

        try:
            return await asyncio.wait_for(future, timeout=60.0)
        except asyncio.TimeoutError:
            self._pending.pop(self._request_id, None)
            raise McpTimeoutError(f"Request {method} timed out")

    async def initialize(self) -> dict:
        result = await self.send_request("initialize", {
            "protocolVersion": "2025-03-26",
            "capabilities": {},
            "clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
        })
        await self._transport.send_message({
            "jsonrpc": "2.0",
            "method": "notifications/initialized",
        })
        return result

    async def close(self):
        await self._transport.close()


async def main():
    client = McpSseClient("http://localhost:8080")
    await client.connect()
    info = await client.initialize()
    print(f"Connected: {info}")

    tools = await client.send_request("tools/list")
    print(f"Available tools: {tools}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 3: 工具发现与动态注册

问题:MCP Server的工具列表是动态的,客户端需要自动发现工具并将其转换为LLM可调用的function calling格式,同时支持工具变更通知。

解决方案:实现工具发现、schema转换、动态注册和变更监听。

工具发现架构

┌──────────────┐  tools/list   ┌──────────────┐
│  MCP Client  │ ────────────> │  MCP Server  │
│              │ <──────────── │              │
│  Tool        │  Tool Schema  │  Tool        │
│  Registry    │               │  Definitions │
│              │  tools/       │              │
│  ┌────────┐  │  list_changed │              │
│  │ OpenAI │  │ <──────────── │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Claude │  │               │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Gemini │  │               │              │
│  │ Format │  │               │              │
│  └────────┘  │               │              │
└──────────────┘               └──────────────┘

完整实现

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class McpTool:
    name: str
    description: str
    input_schema: dict[str, Any]
    annotations: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_mcp(cls, data: dict) -> "McpTool":
        return cls(
            name=data["name"],
            description=data.get("description", ""),
            input_schema=data.get("inputSchema", {}),
            annotations=data.get("annotations", {}),
        )


class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, McpTool] = {}
        self._change_handlers: list[Callable] = []

    def register(self, tool: McpTool):
        self._tools[tool.name] = tool
        logger.info(f"Registered tool: {tool.name}")
        self._notify_change("registered", tool)

    def unregister(self, name: str):
        if name in self._tools:
            tool = self._tools.pop(name)
            logger.info(f"Unregistered tool: {name}")
            self._notify_change("unregistered", tool)

    def get(self, name: str) -> McpTool | None:
        return self._tools.get(name)

    def list_tools(self) -> list[McpTool]:
        return list(self._tools.values())

    def on_change(self, handler: Callable):
        self._change_handlers.append(handler)

    def _notify_change(self, action: str, tool: McpTool):
        for handler in self._change_handlers:
            try:
                handler(action, tool)
            except Exception as e:
                logger.error(f"Change handler error: {e}")

    def to_openai_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.input_schema,
                },
            })
        return result

    def to_claude_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.input_schema,
            })
        return result

    def to_gemini_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            params = tool.input_schema.get("properties", {})
            required = tool.input_schema.get("required", [])

            declarations = []
            for param_name, param_schema in params.items():
                declarations.append({
                    "name": param_name,
                    "description": param_schema.get("description", ""),
                    "type": self._json_type_to_gemini(param_schema.get("type", "string")),
                    "required": param_name in required,
                })

            result.append({
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "OBJECT",
                    "properties": {d["name"]: {"type": d["type"]} for d in declarations},
                    "required": [d["name"] for d in declarations if d["required"]],
                },
            })
        return result

    @staticmethod
    def _json_type_to_gemini(json_type: str) -> str:
        mapping = {
            "string": "STRING",
            "integer": "INTEGER",
            "number": "NUMBER",
            "boolean": "BOOLEAN",
            "array": "ARRAY",
            "object": "OBJECT",
        }
        return mapping.get(json_type, "STRING")


class ToolDiscovery:
    def __init__(self, client):
        self._client = client
        self._registry = ToolRegistry()
        self._client.on_notification(
            "notifications/tools/list_changed",
            self._on_tools_changed,
        )

    @property
    def registry(self) -> ToolRegistry:
        return self._registry

    async def discover(self) -> list[McpTool]:
        result = await self._client.send_request("tools/list")
        tools = []
        for tool_data in result.get("tools", []):
            tool = McpTool.from_mcp(tool_data)
            self._registry.register(tool)
            tools.append(tool)
        logger.info(f"Discovered {len(tools)} tools")
        return tools

    async def _on_tools_changed(self, params: dict):
        logger.info("Tools list changed, re-discovering...")
        self._registry._tools.clear()
        await self.discover()

    async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
        tool = self._registry.get(name)
        if not tool:
            raise ValueError(f"Tool not found: {name}")

        result = await self._client.send_request("tools/call", {
            "name": name,
            "arguments": arguments,
        })

        contents = result.get("content", [])
        text_parts = []
        for content in contents:
            if content.get("type") == "text":
                text_parts.append(content["text"])
            elif content.get("type") == "image":
                text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
            elif content.get("type") == "resource":
                text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")

        return "\n".join(text_parts) if text_parts else str(result)

动态注册使用

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    tools = await discovery.discover()

    print(f"Discovered {len(tools)} tools:")
    for tool in tools:
        print(f"  - {tool.name}: {tool.description}")

    openai_tools = discovery.registry.to_openai_tools()
    print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2, ensure_ascii=False)}")

    claude_tools = discovery.registry.to_claude_tools()
    print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2, ensure_ascii=False)}")

    result = await discovery.call_tool("search_docs", {"query": "MCP协议解析"})
    print(f"\nTool result: {result}")

    await client.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Pattern 4: 多工具串行编排

问题:AI Agent需要依次调用多个工具,后一个工具的输入依赖前一个工具的输出,如何优雅地管理这种串行依赖链?

解决方案:实现串行编排器,支持依赖注入、中间结果传递和条件分支。

串行编排流程

用户输入: "搜索MCP文档并翻译摘要"
    │
    ▼
┌──────────┐    result     ┌──────────┐    result     ┌──────────┐
│ Tool 1   │ ────────────> │ Tool 2   │ ────────────> │ Tool 3   │
│ 搜索文档  │               │ 提取摘要  │               │ 翻译内容  │
└──────────┘               └──────────┘               └──────────┘
    │                          │                          │
    │ $search_result           │ $summary_text            │ $translated
    ▼                          ▼                          ▼
 Context Store            Context Store              Context Store

完整实现

import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class StepResult:
    step_name: str
    tool_name: str
    input_args: dict[str, Any]
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


@dataclass
class PipelineStep:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    condition: Callable[[dict[str, Any]], bool] | None = None
    timeout: float = 30.0
    retry_count: int = 0

    def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
        if callable(self.arguments):
            return self.arguments(context)
        resolved = {}
        for key, value in self.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


def _deep_get(data: dict, path: str) -> Any:
    keys = path.split(".")
    current = data
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key)
        elif isinstance(current, list) and key.isdigit():
            idx = int(key)
            current = current[idx] if idx < len(current) else None
        else:
            return None
    return current


class SequentialOrchestrator:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery
        self._steps: list[PipelineStep] = []
        self._context: dict[str, Any] = {}

    def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
        self._steps.append(step)
        return self

    def set_context(self, key: str, value: Any):
        self._context[key] = value

    async def execute(self) -> list[StepResult]:
        results = []
        context = dict(self._context)

        for step in self._steps:
            if step.condition and not step.condition(context):
                logger.info(f"Skipping step '{step.name}': condition not met")
                continue

            resolved_args = step.resolve_arguments(context)
            logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= step.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(step.tool_name, resolved_args),
                        timeout=step.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000

                    context[f"steps.{step.name}"] = output
                    context["last_result"] = output

                    results.append(StepResult(
                        step_name=step.name,
                        tool_name=step.tool_name,
                        input_args=resolved_args,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    ))
                    break

                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= step.retry_count:
                        logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
                        await asyncio.sleep(1.0 * attempt)
            else:
                duration = (time.monotonic() - start) * 1000
                results.append(StepResult(
                    step_name=step.name,
                    tool_name=step.tool_name,
                    input_args=resolved_args,
                    output=None,
                    success=False,
                    error=last_error,
                    duration_ms=duration,
                ))
                break

        return results

使用示例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = SequentialOrchestrator(discovery)
    orchestrator.set_context("user_query", "MCP协议解析与Python客户端集成")

    orchestrator.add_step(PipelineStep(
        name="search",
        tool_name="search_docs",
        arguments={"query": "$user_query", "limit": 5},
    ))

    orchestrator.add_step(PipelineStep(
        name="extract",
        tool_name="extract_summary",
        arguments={"text": "$steps.search"},
        condition=lambda ctx: bool(ctx.get("steps.search")),
    ))

    orchestrator.add_step(PipelineStep(
        name="translate",
        tool_name="translate_text",
        arguments={"text": "$steps.extract", "target_lang": "en"},
        condition=lambda ctx: bool(ctx.get("steps.extract")),
    ))

    results = await orchestrator.execute()

    for r in results:
        status = "✓" if r.success else "✗"
        print(f"{status} [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
        if r.error:
            print(f"  Error: {r.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 5: 多工具并行编排

问题:当多个工具调用之间没有依赖关系时,串行执行浪费时间。如何安全地并行调用多个MCP工具并汇总结果?

解决方案:基于asyncio实现并行编排器,支持并发控制、结果聚合和DAG依赖执行。

并行编排架构

                    ┌──────────┐
                    │  用户请求  │
                    └────┬─────┘
                         │
              ┌──────────┼──────────┐
              │          │          │
              ▼          ▼          ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Tool A   │ │ Tool B   │ │ Tool C   │
        │ 搜索文档  │ │ 查询数据库│ │ 调用API  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │             │
             └──────────┬──┘────────────┘
                        │
                        ▼
                 ┌─────────────┐
                 │  结果聚合    │
                 └─────────────┘

完整实现

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class ParallelTask:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    depends_on: list[str] = field(default_factory=list)
    timeout: float = 30.0
    retry_count: int = 0


@dataclass
class TaskResult:
    name: str
    tool_name: str
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


class ParallelOrchestrator:
    def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
        self._discovery = discovery
        self._max_concurrency = max_concurrency
        self._tasks: list[ParallelTask] = []

    def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
        self._tasks.append(task)
        return self

    async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
        context = dict(initial_context or {})
        results: dict[str, TaskResult] = {}
        semaphore = asyncio.Semaphore(self._max_concurrency)
        completed = set()
        remaining = list(self._tasks)

        while remaining:
            ready = [
                t for t in remaining
                if all(dep in completed for dep in t.depends_on)
            ]

            if not ready and remaining:
                logger.error("Circular dependency detected")
                break

            coros = [self._execute_task(task, context, results, semaphore) for task in ready]
            task_results = await asyncio.gather(*coros, return_exceptions=True)

            for task, result in zip(ready, task_results):
                if isinstance(result, Exception):
                    results[task.name] = TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=None,
                        success=False,
                        error=str(result),
                    )
                else:
                    results[task.name] = result
                    context[f"tasks.{task.name}"] = result.output

                completed.add(task.name)
                remaining.remove(task)

        return results

    async def _execute_task(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
        semaphore: asyncio.Semaphore,
    ) -> TaskResult:
        async with semaphore:
            resolved_args = self._resolve_arguments(task, context, results)

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= task.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(task.tool_name, resolved_args),
                        timeout=task.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000
                    return TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    )
                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= task.retry_count:
                        await asyncio.sleep(0.5 * attempt)

            duration = (time.monotonic() - start) * 1000
            return TaskResult(
                name=task.name,
                tool_name=task.tool_name,
                output=None,
                success=False,
                error=last_error,
                duration_ms=duration,
            )

    def _resolve_arguments(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
    ) -> dict[str, Any]:
        if callable(task.arguments):
            return task.arguments(context)

        resolved = {}
        for key, value in task.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                if ref_path.startswith("tasks."):
                    task_name = ref_path.split(".")[1]
                    if task_name in results:
                        resolved[key] = results[task_name].output
                else:
                    resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


class DagOrchestrator(ParallelOrchestrator):
    def validate_dag(self) -> list[str]:
        errors = []
        task_names = {t.name for t in self._tasks}
        for task in self._tasks:
            for dep in task.depends_on:
                if dep not in task_names:
                    errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")

        visited = set()
        path = set()

        def has_cycle(name: str) -> bool:
            visited.add(name)
            path.add(name)
            task = next((t for t in self._tasks if t.name == name), None)
            if task:
                for dep in task.depends_on:
                    if dep in path:
                        return True
                    if dep not in visited and has_cycle(dep):
                        return True
            path.remove(name)
            return False

        for task in self._tasks:
            if task.name not in visited:
                if has_cycle(task.name):
                    errors.append("Circular dependency detected in DAG")
                    break

        return errors

使用示例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)

    orchestrator.add_task(ParallelTask(
        name="search_web",
        tool_name="web_search",
        arguments={"query": "MCP协议解析 2026"},
    ))

    orchestrator.add_task(ParallelTask(
        name="search_docs",
        tool_name="search_docs",
        arguments={"query": "Model Context Protocol Python"},
    ))

    orchestrator.add_task(ParallelTask(
        name="query_db",
        tool_name="query_database",
        arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
    ))

    orchestrator.add_task(ParallelTask(
        name="merge_results",
        tool_name="merge_and_deduplicate",
        arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
        depends_on=["search_web", "search_docs", "query_db"],
    ))

    results = await orchestrator.execute({"user_query": "MCP客户端集成"})

    for name, result in results.items():
        status = "✓" if result.success else "✗"
        print(f"{status} [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
        if result.error:
            print(f"  Error: {result.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 6: 错误处理与重试

问题:MCP客户端在网络不稳定、服务器过载或工具执行异常时,需要健壮的错误处理和重试机制。

解决方案:实现分层错误处理、指数退避重试、熔断器和降级策略。

错误处理架构

┌────────────────────────────────────────────┐
│              Error Handling Layer           │
│                                            │
│  ┌──────────┐  ┌──────────┐  ┌─────────┐  │
│  │ Retry    │  │ Circuit  │  │ Fallback│  │
│  │ Strategy │  │ Breaker  │  │ Handler │  │
│  └──────────┘  └──────────┘  └─────────┘  │
│                                            │
│  ┌──────────────────────────────────────┐  │
│  │        Error Classification          │  │
│  │  Transient / Permanent / Unknown     │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘

完整实现

import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ErrorCategory(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    UNKNOWN = "unknown"


@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_errors: list[int] = field(default_factory=lambda: [
        -32603,  # Internal error
        -32000,  # Server error
    ])


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig | None = None):
        self._config = config or CircuitBreakerConfig()
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = 0
        self._half_open_calls = 0

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
        return self._state

    def allow_request(self) -> bool:
        state = self.state
        if state == CircuitState.CLOSED:
            return True
        if state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self._config.half_open_max_calls:
                self._half_open_calls += 1
                return True
            return False
        return False

    def record_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self._config.half_open_max_calls:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
                self._success_count = 0
        else:
            self._failure_count = 0

    def record_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.OPEN
            self._success_count = 0
        elif self._failure_count >= self._config.failure_threshold:
            self._state = CircuitState.OPEN


class ResilientMcpClient:
    def __init__(
        self,
        client,
        retry_config: RetryConfig | None = None,
        circuit_config: CircuitBreakerConfig | None = None,
    ):
        self._client = client
        self._retry_config = retry_config or RetryConfig()
        self._circuit_breakers: dict[str, CircuitBreaker] = {}
        self._circuit_config = circuit_config or CircuitBreakerConfig()
        self._fallback_handlers: dict[str, Callable] = {}

    def _get_breaker(self, server_name: str) -> CircuitBreaker:
        if server_name not in self._circuit_breakers:
            self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
        return self._circuit_breakers[server_name]

    def register_fallback(self, tool_name: str, handler: Callable):
        self._fallback_handlers[tool_name] = handler

    def classify_error(self, error: Exception) -> ErrorCategory:
        if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
            return ErrorCategory.TRANSIENT
        if isinstance(error, McpError):
            if error.code in self._retry_config.retryable_errors:
                return ErrorCategory.TRANSIENT
            if error.code in [-32600, -32601, -32602]:
                return ErrorCategory.PERMANENT
        return ErrorCategory.UNKNOWN

    async def call_with_retry(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        server_name: str = "default",
    ) -> Any:
        breaker = self._get_breaker(server_name)

        if not breaker.allow_request():
            fallback = self._fallback_handlers.get(tool_name)
            if fallback:
                logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
                return await fallback(arguments)
            raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")

        last_error = None
        for attempt in range(1, self._retry_config.max_attempts + 1):
            try:
                result = await self._client.call_tool(tool_name, arguments)
                breaker.record_success()
                return result
            except Exception as e:
                last_error = e
                category = self.classify_error(e)

                if category == ErrorCategory.PERMANENT:
                    breaker.record_failure()
                    raise

                if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
                    delay = min(
                        self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
                        self._retry_config.max_delay,
                    )
                    if self._retry_config.jitter:
                        import random
                        delay *= (0.5 + random.random() * 0.5)

                    logger.warning(
                        f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
                        f"retrying in {delay:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay)
                else:
                    breaker.record_failure()

        if fallback := self._fallback_handlers.get(tool_name):
            logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
            return await fallback(arguments)

        raise last_error


class CircuitOpenError(Exception):
    pass

使用示例

async def fallback_search(arguments: dict) -> str:
    return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})


async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    resilient = ResilientMcpClient(
        discovery,
        retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
        circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
    )
    resilient.register_fallback("search_docs", fallback_search)

    try:
        result = await resilient.call_with_retry("search_docs", {"query": "MCP协议解析"})
        print(f"Result: {result}")
    except CircuitOpenError as e:
        print(f"Service unavailable: {e}")
    except Exception as e:
        print(f"Failed: {e}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 7: 流式响应处理

问题:长时间运行的工具调用需要实时进度反馈,MCP协议支持通过SSE和Notification实现流式响应。

解决方案:实现流式响应处理器,支持进度通知、部分结果聚合和取消操作。

流式响应架构

┌──────────┐  tools/call   ┌──────────┐
│  Client  │ ────────────> │  Server  │
│          │               │          │
│  Stream  │ <── notify ── │  Tool    │
│  Handler │   progress    │  Running │
│          │ <── notify ── │          │
│          │   progress    │          │
│          │ <── result ── │          │
│          │   complete    │          │
└──────────┘               └──────────┘

完整实现

import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum

logger = logging.getLogger(__name__)


class StreamEventType(Enum):
    PROGRESS = "progress"
    PARTIAL_RESULT = "partial_result"
    COMPLETE = "complete"
    ERROR = "error"
    LOG = "log"


@dataclass
class StreamEvent:
    type: StreamEventType
    data: Any
    progress: float | None = None
    message: str | None = None
    timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())


@dataclass
class StreamResult:
    events: list[StreamEvent] = field(default_factory=list)
    final_result: Any = None
    success: bool = True
    error: str | None = None
    total_duration_ms: float = 0.0


class StreamHandler:
    def __init__(self):
        self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
        self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
        self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
        self._error_handlers: list[Callable[[StreamEvent], Any]] = []
        self._log_handlers: list[Callable[[StreamEvent], Any]] = []

    def on_progress(self, handler: Callable[[StreamEvent], Any]):
        self._progress_handlers.append(handler)

    def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
        self._partial_handlers.append(handler)

    def on_complete(self, handler: Callable[[StreamEvent], Any]):
        self._complete_handlers.append(handler)

    def on_error(self, handler: Callable[[StreamEvent], Any]):
        self._error_handlers.append(handler)

    def on_log(self, handler: Callable[[StreamEvent], Any]):
        self._log_handlers.append(handler)

    async def handle_notification(self, params: dict[str, Any]):
        method = params.get("method", "")

        if method == "notifications/progress":
            progress_data = params.get("progress", {})
            event = StreamEvent(
                type=StreamEventType.PROGRESS,
                data=progress_data,
                progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
                message=progress_data.get("message"),
            )
            for handler in self._progress_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Progress handler error: {e}")

        elif method == "notifications/message":
            level = params.get("level", "info")
            event = StreamEvent(
                type=StreamEventType.LOG,
                data=params.get("data"),
                message=f"[{level}] {params.get('data', {}).get('text', '')}",
            )
            for handler in self._log_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Log handler error: {e}")


class StreamingMcpClient:
    def __init__(self, client, discovery: ToolDiscovery):
        self._client = client
        self._discovery = discovery
        self._stream_handler = StreamHandler()
        self._active_calls: dict[str, asyncio.Task] = {}

        client.on_notification(
            "notifications/progress",
            self._stream_handler.handle_notification,
        )
        client.on_notification(
            "notifications/message",
            self._stream_handler.handle_notification,
        )

    @property
    def stream(self) -> StreamHandler:
        return self._stream_handler

    async def call_with_stream(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        timeout: float = 120.0,
    ) -> StreamResult:
        import time
        start = time.monotonic()
        result = StreamResult()

        self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
        self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))

        try:
            call_result = await asyncio.wait_for(
                self._discovery.call_tool(tool_name, arguments),
                timeout=timeout,
            )
            result.final_result = call_result
            result.success = True
            result.events.append(StreamEvent(
                type=StreamEventType.COMPLETE,
                data=call_result,
            ))
        except asyncio.TimeoutError:
            result.success = False
            result.error = f"Tool call timed out after {timeout}s"
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=result.error,
            ))
        except Exception as e:
            result.success = False
            result.error = str(e)
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=str(e),
            ))

        result.total_duration_ms = (time.monotonic() - start) * 1000
        return result

    async def cancel(self, request_id: int | str):
        await self._client.send_notification("notifications/cancelled", {
            "requestId": request_id,
            "reason": "User cancelled",
        })

使用示例

async def main():
    client = JsonRpcClient(timeout=120.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    streaming = StreamingMcpClient(client, discovery)

    streaming.stream.on_progress(lambda e: print(f"  Progress: {e.progress:.0%} - {e.message or ''}"))
    streaming.stream.on_log(lambda e: print(f"  Log: {e.message}"))
    streaming.stream.on_complete(lambda e: print(f"  Complete!"))
    streaming.stream.on_error(lambda e: print(f"  Error: {e.message}"))

    print("Calling long-running tool with streaming...")
    result = await streaming.call_with_stream(
        "batch_process",
        {"items": list(range(100)), "operation": "analyze"},
        timeout=120.0,
    )

    print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
    if result.final_result:
        print(f"Output: {result.final_result[:200]}...")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

5个常见坑及解决方案

1. 初始化握手遗漏initialized通知

:发送initialize请求后忘记发送notifications/initialized通知,导致Server一直等待握手完成,后续请求全部超时。

解决方案

async def safe_initialize(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {},
        "clientInfo": {"name": "my-client", "version": "1.0.0"},
    })
    # 必须发送initialized通知
    await client.send_notification("notifications/initialized")
    return result

2. SSE连接endpoint未等待就发送请求

:SSE连接建立后立即发送JSON-RPC请求,但还没收到endpoint事件,导致请求发到了错误的URL。

解决方案

class SafeSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._endpoint_ready = asyncio.Event()
        self._transport.on_event("endpoint", self._on_endpoint)

    async def _on_endpoint(self, data):
        self._endpoint_ready.set()

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
        # 现在可以安全发送请求
        ...

3. 工具schema转换丢失required字段

:将MCP工具schema转换为OpenAI function calling格式时,inputSchema直接作为parameters使用,但某些LLM要求required字段必须存在。

解决方案

def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
    schema = dict(mcp_tool.input_schema)
    if "required" not in schema:
        schema["required"] = []
    if "type" not in schema:
        schema["type"] = "object"
    return {
        "type": "function",
        "function": {
            "name": mcp_tool.name,
            "description": mcp_tool.description,
            "parameters": schema,
        },
    }

4. 并行调用时共享状态竞态

:多个并行工具调用共享同一个httpx.AsyncClientasyncio.Future字典,导致响应错配。

解决方案:每个并行调用使用独立的请求ID空间,或使用连接池隔离:

class IsolatedClientPool:
    def __init__(self, base_url: str, pool_size: int = 5):
        self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
        self._index = 0
        self._lock = asyncio.Lock()

    async def get_client(self) -> McpSseClient:
        async with self._lock:
            client = self._clients[self._index % len(self._clients)]
            self._index += 1
            return client

5. 流式响应中进度通知与结果混淆

:SSE通道中进度通知和最终结果使用同一个流,客户端把进度通知当作最终结果处理。

解决方案:严格区分消息类型,只处理带id字段的响应作为请求结果:

async def _handle_sse_message(self, message: dict):
    if "id" in message:
        # 这是请求响应,匹配pending future
        future = self._pending.pop(message["id"], None)
        if future and not future.done():
            future.set_result(message.get("result"))
    elif "method" in message:
        # 这是通知,交给stream handler
        await self._stream_handler.handle_notification(message)

10个常见报错排查

报错信息 原因 解决方法
MCP Error [-32600]: Invalid Request JSON-RPC消息格式错误,缺少必填字段 检查请求是否包含jsonrpcmethodid字段,使用 /zh-CN/json/format 验证格式
MCP Error [-32601]: Method not found 调用了Server不支持的方法 先调用initialize确认Server支持的能力,检查方法名拼写
MCP Error [-32602]: Invalid params 工具参数类型或结构不匹配 tools/list获取正确的inputSchema,对照检查参数
Connection refused MCP Server未启动或端口错误 确认Server进程运行中,检查端口和URL配置
SSE connection timeout 网络不通或Server未响应SSE握手 检查防火墙规则,确认SSE endpoint可达
Request timed out after 30s 工具执行时间超过客户端超时设置 增大timeout参数,或使用流式响应获取进度
Circuit breaker open 连续失败次数超过熔断阈值 检查Server健康状态,等待恢复超时后重试
Tool not found: xxx 调用了未注册的工具名 重新执行tools/list发现,检查工具名大小写
Endpoint not received SSE连接后未收到endpoint事件 确认Server正确发送endpoint事件,检查SSE路由配置
JSON decode error in SSE stream SSE数据不是有效JSON 检查Server端SSE事件格式,确保data字段是合法JSON

进阶优化技巧

1. 连接池与多Server管理

生产环境中,AI Agent通常需要同时连接多个MCP Server。使用连接池管理多个客户端实例:

class McpConnectionPool:
    def __init__(self):
        self._connections: dict[str, Any] = {}
        self._discovery_map: dict[str, ToolDiscovery] = {}

    async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
        client = JsonRpcClient(timeout=30.0)
        await client.connect_stdio(command, args)
        await initialize_client(client)
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    async def connect_sse(self, name: str, url: str):
        client = McpSseClient(url)
        await client.connect()
        await client.initialize()
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    def get_discovery(self, name: str) -> ToolDiscovery | None:
        return self._discovery_map.get(name)

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        for name, discovery in self._discovery_map.items():
            if discovery.registry.get(tool_name):
                return await discovery.call_tool(tool_name, arguments)
        raise ValueError(f"Tool '{tool_name}' not found in any connected server")

    async def close_all(self):
        for client in self._connections.values():
            await client.close()

2. 工具调用缓存

对幂等工具调用结果进行缓存,减少重复请求:

import hashlib
import json
from typing import Any


class ToolCallCache:
    def __init__(self, ttl: float = 300.0, max_size: int = 1000):
        self._cache: dict[str, tuple[float, Any]] = {}
        self._ttl = ttl
        self._max_size = max_size

    def _make_key(self, tool_name: str, arguments: dict) -> str:
        raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()

    def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
        key = self._make_key(tool_name, arguments)
        if key in self._cache:
            ts, value = self._cache[key]
            if asyncio.get_event_loop().time() - ts < self._ttl:
                return True, value
            del self._cache[key]
        return False, None

    def set(self, tool_name: str, arguments: dict, value: Any):
        if len(self._cache) >= self._max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]
        key = self._make_key(tool_name, arguments)
        self._cache[key] = (asyncio.get_event_loop().time(), value)

    def invalidate(self, tool_name: str, arguments: dict | None = None):
        if arguments:
            key = self._make_key(tool_name, arguments)
            self._cache.pop(key, None)
        else:
            prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
            keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
            for k in keys_to_remove:
                del self._cache[k]

3. 可观测性与追踪

为MCP客户端调用添加OpenTelemetry追踪:

from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")

tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")


class ObservableMcpClient:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
            span.set_attribute("mcp.tool.name", tool_name)
            span.set_attribute("mcp.tool.args_count", len(arguments))

            import time
            start = time.monotonic()
            try:
                result = await self._discovery.call_tool(tool_name, arguments)
                duration = time.monotonic() - start

                span.set_status(Status(StatusCode.OK))
                tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
                tool_call_duration.record(duration, {"tool": tool_name})

                return result
            except Exception as e:
                duration = time.monotonic() - start
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
                tool_call_duration.record(duration, {"tool": tool_name})
                raise

对比分析:3种编排方案

维度 串行编排 并行编排 DAG编排
执行方式 顺序执行,前一步完成后执行下一步 所有任务同时执行 按依赖关系拓扑排序执行
适用场景 工具间有严格数据依赖 工具间完全独立 工具间部分有依赖、部分独立
延迟 所有工具耗时之和 最慢工具的耗时 关键路径上的耗时之和
错误传播 前一步失败则后续全部跳过 单个失败不影响其他 失败节点的下游被跳过
实现复杂度
资源消耗 低(同一时间只用一个连接) 高(需要并发连接池) 中(按层级并发)
结果一致性 强(顺序确定) 弱(完成顺序不确定) 中(同层级顺序不确定)
超时控制 每步独立超时 全局超时+单任务超时 每步独立+全局超时
典型场景 搜索→提取→翻译 同时搜索3个数据源 搜索A+B→合并→翻译

选择建议

  • 工具间有严格因果依赖 → 串行编排
  • 工具间完全独立 → 并行编排
  • 工具间有部分依赖 → DAG编排

更多AI Agent工作流编排内容,推荐阅读 AI Agent工作流DAG编排AI Agent协议对比


在线工具推荐

  • JSON格式化:调试MCP协议消息时,使用 /zh-CN/json/format 格式化JSON-RPC请求和响应
  • cURL转代码:测试MCP Server接口时,使用 /zh-CN/dev/curl-to-code 快速生成Python客户端代码
  • Base64编解码:处理MCP传输中的二进制数据时,使用 /zh-CN/encode/base64 编解码

总结:Python MCP客户端集成远不止发送JSON-RPC请求那么简单。从基础的stdio连接到SSE远程传输,从工具发现与动态注册到多工具串行/并行/DAG编排,从错误处理与熔断降级到流式响应与进度追踪——每一个环节都需要生产级的工程实践。掌握这7种模式,你的AI Agent将具备可靠、高效、可观测的工具调用能力。记住:MCP协议解析是基础,多工具编排是核心,错误处理是保障,流式响应是体验。参考MCP官方规范 Model Context Protocol Specification 获取最新协议定义。

本站提供浏览器本地工具,免注册即可试用 →

#Python#MCP#Model Context Protocol#AI工具编排#函数调用#多工具集成#2026#AI与大数据