Python MCP客戶端整合實戰:從協議解析到多工具編排的7種生產模式

AI与大数据

為什麼你的AI應用總是無法連接外部工具?MCP客戶端整合的5大痛點

你花了三天寫了一個完美的MCP Server,結果Claude Desktop連不上?你的AI Agent呼叫三個工具,第一個超時後面全掛?工具發現回傳了一堆schema,你不知道怎麼轉成LLM能用的function calling格式?SSE連線總是斷開重連,資料丟了還不知道?多工具編排時串行太慢並行又亂套?

這些問題的根源在於:大多數開發者只關注了MCP Server的開發,卻忽略了Client端的整合複雜度。MCP協議雖然定義了標準通訊方式,但客戶端需要處理JSON-RPC訊息建構、傳輸層適配、工具發現與轉換、編排策略、錯誤恢復等一整套工程問題。

核心要點

  • 掌握MCP協議JSON-RPC 2.0通訊機制,實現可靠的客戶端-伺服器連線
  • 學會工具發現與動態註冊,讓AI Agent自動適配新工具
  • 理解多工具編排的7種生產模式,從串行呼叫到並行DAG執行
  • 建立完善的錯誤處理與重試機制,應對網路抖動和服務不可用
  • 掌握SSE串流回應處理,實現即時工具呼叫進度回饋

目錄導航


MCP協議核心概念

在深入7種生產模式之前,我們需要理解MCP客戶端在協議棧中的位置和核心概念。

MCP客戶端架構總覽

┌─────────────────────────────────────────────────┐
│                  AI Application                  │
│  (Claude Desktop / ChatGPT / 自定義Agent)        │
├─────────────────────────────────────────────────┤
│               MCP Client Layer                   │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ JSON-RPC  │ │Transport │ │ Tool Registry  │  │
│  │ Handler   │ │ Adapter  │ │ & Discovery    │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ Orchest-  │ │ Error &  │ │ Stream         │  │
│  │ ration    │ │ Retry    │ │ Handler        │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
├─────────────────────────────────────────────────┤
│              Transport Layer                     │
│     stdio / SSE / Streamable HTTP               │
├─────────────────────────────────────────────────┤
│              MCP Server                          │
│  Tools / Resources / Prompts                    │
└─────────────────────────────────────────────────┘

核心概念對照表

概念 說明 JSON-RPC方法 類比
Initialize 客戶端與伺服器握手 initialize TCP三次握手
Tool Discovery 取得伺服器支援的工具列表 tools/list DNS查詢
Tool Call 呼叫指定工具並取得結果 tools/call HTTP POST
Resource Read 讀取伺服器暴露的資源 resources/read HTTP GET
Prompt Get 取得預定義提示模板 prompts/get 模板引擎
Notification 單向通知,無需回應 無id欄位 UDP廣播
Cancel 取消正在進行的請求 notifications/cancelled HTTP取消

JSON-RPC 2.0訊息格式

MCP協議基於JSON-RPC 2.0,理解訊息格式是客戶端開發的基礎:

// 請求訊息
{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "search_docs",
    "arguments": {"query": "MCP協議"}
  },
  "id": 1
}

// 成功回應
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {"type": "text", "text": "搜尋結果..."}
    ]
  },
  "id": 1
}

// 錯誤回應
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32600,
    "message": "Invalid Request"
  },
  "id": 1
}

如果你對MCP Server開發感興趣,可以參考我們之前的文章:Python MCP Server開發實戰。如果你需要了解更底層的AI函式呼叫協議,推薦閱讀 AI函式呼叫生產實踐


Pattern 1: 基礎JSON-RPC客戶端

問題:你的AI應用需要連接MCP Server,但不知道如何建構正確的JSON-RPC訊息、管理請求ID、處理回應。

解決方案:建構一個完整的JSON-RPC 2.0客戶端,封裝訊息建構、發送、回應匹配和超時處理。

完整實現

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable


@dataclass
class JsonRpcRequest:
    jsonrpc: str = "2.0"
    method: str = ""
    params: dict[str, Any] = field(default_factory=dict)
    id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)


@dataclass
class JsonRpcResponse:
    id: int | str
    result: Any = None
    error: dict[str, Any] | None = None


class JsonRpcClient:
    def __init__(self, timeout: float = 30.0):
        self._request_id = 0
        self._pending: dict[int | str, asyncio.Future] = {}
        self._timeout = timeout
        self._notification_handlers: dict[str, Callable] = {}
        self._writer: asyncio.StreamWriter | None = None
        self._reader: asyncio.StreamReader | None = None

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
        import os
        proc_env = {**os.environ, **(env or {})}
        self._process = await asyncio.create_subprocess_exec(
            command,
            *(args or []),
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=proc_env,
        )
        self._reader = self._process.stdout
        self._writer = self._process.stdin
        asyncio.create_task(self._read_loop())

    async def _read_loop(self):
        while self._reader and not self._reader.at_eof():
            line = await self._reader.readline()
            if not line:
                continue
            try:
                message = json.loads(line.decode().strip())
                await self._handle_message(message)
            except json.JSONDecodeError:
                continue

    async def _handle_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(
                        McpError(message["error"]["code"], message["error"]["message"])
                    )
                else:
                    future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
        elif "method" in message:
            handler = self._notification_handlers.get(message["method"])
            if handler:
                await handler(message.get("params", {}))

    async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
        request_id = self._next_id()
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[request_id] = future

        data = json.dumps(request) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

        try:
            response = await asyncio.wait_for(future, timeout=self._timeout)
            return response.result
        except asyncio.TimeoutError:
            self._pending.pop(request_id, None)
            raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")

    async def send_notification(self, method: str, params: dict[str, Any] | None = None):
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        data = json.dumps(notification) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

    def on_notification(self, method: str, handler: Callable):
        self._notification_handlers[method] = handler

    async def close(self):
        if self._writer:
            self._writer.close()
        if hasattr(self, "_process") and self._process:
            self._process.terminate()
            await self._process.wait()


class McpError(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"MCP Error [{code}]: {message}")


class McpTimeoutError(Exception):
    pass

初始化握手

async def initialize_client(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {
            "roots": {"listChanged": True},
            "sampling": {},
        },
        "clientInfo": {
            "name": "toolsku-mcp-client",
            "version": "1.0.0",
        },
    })
    await client.send_notification("notifications/initialized")
    return result

使用範例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])

    server_info = await initialize_client(client)
    print(f"Connected to: {server_info['serverInfo']['name']}")

    tools = await client.send_request("tools/list")
    for tool in tools.get("tools", []):
        print(f"  - {tool['name']}: {tool['description']}")

    result = await client.send_request("tools/call", {
        "name": "search_docs",
        "arguments": {"query": "MCP協議解析"},
    })
    print(f"Result: {result}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 2: SSE傳輸層實現

問題:stdio傳輸僅適用於本機處理序通訊,生產環境需要透過網路連接遠端MCP Server,SSE(Server-Sent Events)是MCP協議支援的遠端傳輸方式。

解決方案:實現完整的SSE傳輸層,包括連線管理、事件解析、訊息路由和自動重連。

SSE傳輸架構

┌──────────────┐     POST /messages     ┌──────────────┐
│  MCP Client  │ ──────────────────────> │  MCP Server  │
│              │                          │              │
│  SSE Handler │ <──── GET /sse ──────── │  SSE Endpoint│
│              │     EventStream          │              │
└──────────────┘                          └──────────────┘
     │                                          │
     │  1. GET /sse 建立SSE連線                  │
     │  2. 接收 endpoint 事件取得POST URL        │
     │  3. POST /messages 發送JSON-RPC請求       │
     │  4. 透過SSE接收JSON-RPC回應               │
     └──────────────────────────────────────────┘

完整實現

import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx

logger = logging.getLogger(__name__)


@dataclass
class SseEvent:
    event: str = "message"
    data: str = ""
    id: str | None = None
    retry: int | None = None


class SseTransport:
    def __init__(
        self,
        server_url: str,
        reconnect_interval: float = 5.0,
        max_reconnect_attempts: int = 10,
        headers: dict[str, str] | None = None,
    ):
        self._server_url = server_url.rstrip("/")
        self._reconnect_interval = reconnect_interval
        self._max_reconnect_attempts = max_reconnect_attempts
        self._headers = headers or {}
        self._message_endpoint: str | None = None
        self._session: httpx.AsyncClient | None = None
        self._running = False
        self._reconnect_count = 0
        self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
        self._last_event_id: str | None = None

    async def connect(self):
        self._session = httpx.AsyncClient(
            headers={**self._headers, "Accept": "text/event-stream"},
            timeout=httpx.Timeout(30.0, read=300.0),
        )
        self._running = True
        asyncio.create_task(self._sse_listener())

    async def _sse_listener(self):
        while self._running:
            try:
                sse_url = f"{self._server_url}/sse"
                params = {}
                if self._last_event_id:
                    params["Last-Event-ID"] = self._last_event_id

                async with self._session.stream("GET", sse_url, params=params) as response:
                    response.raise_for_status()
                    self._reconnect_count = 0

                    current_event = SseEvent()
                    async for line in response.aiter_lines():
                        if not self._running:
                            break

                        if line.startswith(":"):
                            continue

                        if line == "":
                            if current_event.data:
                                await self._dispatch_event(current_event)
                            current_event = SseEvent()
                            continue

                        if ":" in line:
                            field_name, _, field_value = line.partition(":")
                            field_value = field_value.lstrip(" ")

                            if field_name == "event":
                                current_event.event = field_value
                            elif field_name == "data":
                                if current_event.data:
                                    current_event.data += "\n"
                                current_event.data += field_value
                            elif field_name == "id":
                                current_event.id = field_value
                                self._last_event_id = field_value
                            elif field_name == "retry":
                                try:
                                    current_event.retry = int(field_value)
                                except ValueError:
                                    pass

            except httpx.HTTPStatusError as e:
                logger.error(f"SSE HTTP error: {e.response.status_code}")
            except httpx.RequestError as e:
                logger.error(f"SSE connection error: {e}")
            except Exception as e:
                logger.error(f"SSE unexpected error: {e}")

            if self._running:
                self._reconnect_count += 1
                if self._reconnect_count > self._max_reconnect_attempts:
                    logger.error("Max reconnect attempts reached, stopping")
                    self._running = False
                    break
                wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
                logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
                await asyncio.sleep(wait)

    async def _dispatch_event(self, event: SseEvent):
        if event.event == "endpoint":
            self._message_endpoint = f"{self._server_url}{event.data}"
            logger.info(f"Message endpoint: {self._message_endpoint}")

        handlers = self._event_handlers.get(event.event, [])
        for handler in handlers:
            try:
                if event.event == "endpoint":
                    await handler(event.data)
                else:
                    message = json.loads(event.data)
                    await handler(message)
            except Exception as e:
                logger.error(f"Event handler error: {e}")

    async def send_message(self, message: dict[str, Any]) -> None:
        if not self._message_endpoint:
            raise RuntimeError("SSE transport not initialized, endpoint not received")
        if not self._session:
            raise RuntimeError("SSE session not connected")

        response = await self._session.post(
            self._message_endpoint,
            json=message,
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()

    def on_event(self, event_type: str, handler: Callable):
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)

    async def close(self):
        self._running = False
        if self._session:
            await self._session.aclose()

整合使用

class McpSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._request_id = 0
        self._pending: dict[int, asyncio.Future] = {}
        self._transport.on_event("message", self._on_message)

    async def connect(self):
        await self._transport.connect()

    async def _on_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
                else:
                    future.set_result(message.get("result"))

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        self._request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": self._request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[self._request_id] = future
        await self._transport.send_message(request)

        try:
            return await asyncio.wait_for(future, timeout=60.0)
        except asyncio.TimeoutError:
            self._pending.pop(self._request_id, None)
            raise McpTimeoutError(f"Request {method} timed out")

    async def initialize(self) -> dict:
        result = await self.send_request("initialize", {
            "protocolVersion": "2025-03-26",
            "capabilities": {},
            "clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
        })
        await self._transport.send_message({
            "jsonrpc": "2.0",
            "method": "notifications/initialized",
        })
        return result

    async def close(self):
        await self._transport.close()


async def main():
    client = McpSseClient("http://localhost:8080")
    await client.connect()
    info = await client.initialize()
    print(f"Connected: {info}")

    tools = await client.send_request("tools/list")
    print(f"Available tools: {tools}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 3: 工具發現與動態註冊

問題:MCP Server的工具列表是動態的,客戶端需要自動發現工具並將其轉換為LLM可呼叫的function calling格式,同時支援工具變更通知。

解決方案:實現工具發現、schema轉換、動態註冊和變更監聽。

工具發現架構

┌──────────────┐  tools/list   ┌──────────────┐
│  MCP Client  │ ────────────> │  MCP Server  │
│              │ <──────────── │              │
│  Tool        │  Tool Schema  │  Tool        │
│  Registry    │               │  Definitions │
│              │  tools/       │              │
│  ┌────────┐  │  list_changed │              │
│  │ OpenAI │  │ <──────────── │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Claude │  │               │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Gemini │  │               │              │
│  │ Format │  │               │              │
│  └────────┘  │               │              │
└──────────────┘               └──────────────┘

完整實現

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class McpTool:
    name: str
    description: str
    input_schema: dict[str, Any]
    annotations: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_mcp(cls, data: dict) -> "McpTool":
        return cls(
            name=data["name"],
            description=data.get("description", ""),
            input_schema=data.get("inputSchema", {}),
            annotations=data.get("annotations", {}),
        )


class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, McpTool] = {}
        self._change_handlers: list[Callable] = []

    def register(self, tool: McpTool):
        self._tools[tool.name] = tool
        logger.info(f"Registered tool: {tool.name}")
        self._notify_change("registered", tool)

    def unregister(self, name: str):
        if name in self._tools:
            tool = self._tools.pop(name)
            logger.info(f"Unregistered tool: {name}")
            self._notify_change("unregistered", tool)

    def get(self, name: str) -> McpTool | None:
        return self._tools.get(name)

    def list_tools(self) -> list[McpTool]:
        return list(self._tools.values())

    def on_change(self, handler: Callable):
        self._change_handlers.append(handler)

    def _notify_change(self, action: str, tool: McpTool):
        for handler in self._change_handlers:
            try:
                handler(action, tool)
            except Exception as e:
                logger.error(f"Change handler error: {e}")

    def to_openai_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.input_schema,
                },
            })
        return result

    def to_claude_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.input_schema,
            })
        return result

    def to_gemini_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            params = tool.input_schema.get("properties", {})
            required = tool.input_schema.get("required", [])

            declarations = []
            for param_name, param_schema in params.items():
                declarations.append({
                    "name": param_name,
                    "description": param_schema.get("description", ""),
                    "type": self._json_type_to_gemini(param_schema.get("type", "string")),
                    "required": param_name in required,
                })

            result.append({
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "OBJECT",
                    "properties": {d["name"]: {"type": d["type"]} for d in declarations},
                    "required": [d["name"] for d in declarations if d["required"]],
                },
            })
        return result

    @staticmethod
    def _json_type_to_gemini(json_type: str) -> str:
        mapping = {
            "string": "STRING",
            "integer": "INTEGER",
            "number": "NUMBER",
            "boolean": "BOOLEAN",
            "array": "ARRAY",
            "object": "OBJECT",
        }
        return mapping.get(json_type, "STRING")


class ToolDiscovery:
    def __init__(self, client):
        self._client = client
        self._registry = ToolRegistry()
        self._client.on_notification(
            "notifications/tools/list_changed",
            self._on_tools_changed,
        )

    @property
    def registry(self) -> ToolRegistry:
        return self._registry

    async def discover(self) -> list[McpTool]:
        result = await self._client.send_request("tools/list")
        tools = []
        for tool_data in result.get("tools", []):
            tool = McpTool.from_mcp(tool_data)
            self._registry.register(tool)
            tools.append(tool)
        logger.info(f"Discovered {len(tools)} tools")
        return tools

    async def _on_tools_changed(self, params: dict):
        logger.info("Tools list changed, re-discovering...")
        self._registry._tools.clear()
        await self.discover()

    async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
        tool = self._registry.get(name)
        if not tool:
            raise ValueError(f"Tool not found: {name}")

        result = await self._client.send_request("tools/call", {
            "name": name,
            "arguments": arguments,
        })

        contents = result.get("content", [])
        text_parts = []
        for content in contents:
            if content.get("type") == "text":
                text_parts.append(content["text"])
            elif content.get("type") == "image":
                text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
            elif content.get("type") == "resource":
                text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")

        return "\n".join(text_parts) if text_parts else str(result)

動態註冊使用

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    tools = await discovery.discover()

    print(f"Discovered {len(tools)} tools:")
    for tool in tools:
        print(f"  - {tool.name}: {tool.description}")

    openai_tools = discovery.registry.to_openai_tools()
    print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2, ensure_ascii=False)}")

    claude_tools = discovery.registry.to_claude_tools()
    print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2, ensure_ascii=False)}")

    result = await discovery.call_tool("search_docs", {"query": "MCP協議解析"})
    print(f"\nTool result: {result}")

    await client.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Pattern 4: 多工具串行編排

問題:AI Agent需要依序呼叫多個工具,後一個工具的輸入依賴前一個工具的輸出,如何優雅地管理這種串行依賴鏈?

解決方案:實現串行編排器,支援依賴注入、中間結果傳遞和條件分支。

串行編排流程

使用者輸入: "搜尋MCP文件並翻譯摘要"
    │
    ▼
┌──────────┐    result     ┌──────────┐    result     ┌──────────┐
│ Tool 1   │ ────────────> │ Tool 2   │ ────────────> │ Tool 3   │
│ 搜尋文件  │               │ 擷取摘要  │               │ 翻譯內容  │
└──────────┘               └──────────┘               └──────────┘
    │                          │                          │
    │ $search_result           │ $summary_text            │ $translated
    ▼                          ▼                          ▼
 Context Store            Context Store              Context Store

完整實現

import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class StepResult:
    step_name: str
    tool_name: str
    input_args: dict[str, Any]
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


@dataclass
class PipelineStep:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    condition: Callable[[dict[str, Any]], bool] | None = None
    timeout: float = 30.0
    retry_count: int = 0

    def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
        if callable(self.arguments):
            return self.arguments(context)
        resolved = {}
        for key, value in self.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


def _deep_get(data: dict, path: str) -> Any:
    keys = path.split(".")
    current = data
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key)
        elif isinstance(current, list) and key.isdigit():
            idx = int(key)
            current = current[idx] if idx < len(current) else None
        else:
            return None
    return current


class SequentialOrchestrator:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery
        self._steps: list[PipelineStep] = []
        self._context: dict[str, Any] = {}

    def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
        self._steps.append(step)
        return self

    def set_context(self, key: str, value: Any):
        self._context[key] = value

    async def execute(self) -> list[StepResult]:
        results = []
        context = dict(self._context)

        for step in self._steps:
            if step.condition and not step.condition(context):
                logger.info(f"Skipping step '{step.name}': condition not met")
                continue

            resolved_args = step.resolve_arguments(context)
            logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= step.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(step.tool_name, resolved_args),
                        timeout=step.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000

                    context[f"steps.{step.name}"] = output
                    context["last_result"] = output

                    results.append(StepResult(
                        step_name=step.name,
                        tool_name=step.tool_name,
                        input_args=resolved_args,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    ))
                    break

                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= step.retry_count:
                        logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
                        await asyncio.sleep(1.0 * attempt)
            else:
                duration = (time.monotonic() - start) * 1000
                results.append(StepResult(
                    step_name=step.name,
                    tool_name=step.tool_name,
                    input_args=resolved_args,
                    output=None,
                    success=False,
                    error=last_error,
                    duration_ms=duration,
                ))
                break

        return results

使用範例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = SequentialOrchestrator(discovery)
    orchestrator.set_context("user_query", "MCP協議解析與Python客戶端整合")

    orchestrator.add_step(PipelineStep(
        name="search",
        tool_name="search_docs",
        arguments={"query": "$user_query", "limit": 5},
    ))

    orchestrator.add_step(PipelineStep(
        name="extract",
        tool_name="extract_summary",
        arguments={"text": "$steps.search"},
        condition=lambda ctx: bool(ctx.get("steps.search")),
    ))

    orchestrator.add_step(PipelineStep(
        name="translate",
        tool_name="translate_text",
        arguments={"text": "$steps.extract", "target_lang": "en"},
        condition=lambda ctx: bool(ctx.get("steps.extract")),
    ))

    results = await orchestrator.execute()

    for r in results:
        status = "✓" if r.success else "✗"
        print(f"{status} [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
        if r.error:
            print(f"  Error: {r.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 5: 多工具並行編排

問題:當多個工具呼叫之間沒有依賴關係時,串行執行浪費時間。如何安全地並行呼叫多個MCP工具並彙總結果?

解決方案:基於asyncio實現並行編排器,支援並發控制、結果聚合和DAG依賴執行。

並行編排架構

                    ┌──────────┐
                    │  使用者請求│
                    └────┬─────┘
                         │
              ┌──────────┼──────────┐
              │          │          │
              ▼          ▼          ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Tool A   │ │ Tool B   │ │ Tool C   │
        │ 搜尋文件  │ │ 查詢資料庫│ │ 呼叫API  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │             │
             └──────────┬──┘────────────┘
                        │
                        ▼
                 ┌─────────────┐
                 │  結果聚合    │
                 └─────────────┘

完整實現

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class ParallelTask:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    depends_on: list[str] = field(default_factory=list)
    timeout: float = 30.0
    retry_count: int = 0


@dataclass
class TaskResult:
    name: str
    tool_name: str
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


class ParallelOrchestrator:
    def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
        self._discovery = discovery
        self._max_concurrency = max_concurrency
        self._tasks: list[ParallelTask] = []

    def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
        self._tasks.append(task)
        return self

    async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
        context = dict(initial_context or {})
        results: dict[str, TaskResult] = {}
        semaphore = asyncio.Semaphore(self._max_concurrency)
        completed = set()
        remaining = list(self._tasks)

        while remaining:
            ready = [
                t for t in remaining
                if all(dep in completed for dep in t.depends_on)
            ]

            if not ready and remaining:
                logger.error("Circular dependency detected")
                break

            coros = [self._execute_task(task, context, results, semaphore) for task in ready]
            task_results = await asyncio.gather(*coros, return_exceptions=True)

            for task, result in zip(ready, task_results):
                if isinstance(result, Exception):
                    results[task.name] = TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=None,
                        success=False,
                        error=str(result),
                    )
                else:
                    results[task.name] = result
                    context[f"tasks.{task.name}"] = result.output

                completed.add(task.name)
                remaining.remove(task)

        return results

    async def _execute_task(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
        semaphore: asyncio.Semaphore,
    ) -> TaskResult:
        async with semaphore:
            resolved_args = self._resolve_arguments(task, context, results)

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= task.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(task.tool_name, resolved_args),
                        timeout=task.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000
                    return TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    )
                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= task.retry_count:
                        await asyncio.sleep(0.5 * attempt)

            duration = (time.monotonic() - start) * 1000
            return TaskResult(
                name=task.name,
                tool_name=task.tool_name,
                output=None,
                success=False,
                error=last_error,
                duration_ms=duration,
            )

    def _resolve_arguments(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
    ) -> dict[str, Any]:
        if callable(task.arguments):
            return task.arguments(context)

        resolved = {}
        for key, value in task.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                if ref_path.startswith("tasks."):
                    task_name = ref_path.split(".")[1]
                    if task_name in results:
                        resolved[key] = results[task_name].output
                else:
                    resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


class DagOrchestrator(ParallelOrchestrator):
    def validate_dag(self) -> list[str]:
        errors = []
        task_names = {t.name for t in self._tasks}
        for task in self._tasks:
            for dep in task.depends_on:
                if dep not in task_names:
                    errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")

        visited = set()
        path = set()

        def has_cycle(name: str) -> bool:
            visited.add(name)
            path.add(name)
            task = next((t for t in self._tasks if t.name == name), None)
            if task:
                for dep in task.depends_on:
                    if dep in path:
                        return True
                    if dep not in visited and has_cycle(dep):
                        return True
            path.remove(name)
            return False

        for task in self._tasks:
            if task.name not in visited:
                if has_cycle(task.name):
                    errors.append("Circular dependency detected in DAG")
                    break

        return errors

使用範例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)

    orchestrator.add_task(ParallelTask(
        name="search_web",
        tool_name="web_search",
        arguments={"query": "MCP協議解析 2026"},
    ))

    orchestrator.add_task(ParallelTask(
        name="search_docs",
        tool_name="search_docs",
        arguments={"query": "Model Context Protocol Python"},
    ))

    orchestrator.add_task(ParallelTask(
        name="query_db",
        tool_name="query_database",
        arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
    ))

    orchestrator.add_task(ParallelTask(
        name="merge_results",
        tool_name="merge_and_deduplicate",
        arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
        depends_on=["search_web", "search_docs", "query_db"],
    ))

    results = await orchestrator.execute({"user_query": "MCP客戶端整合"})

    for name, result in results.items():
        status = "✓" if result.success else "✗"
        print(f"{status} [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
        if result.error:
            print(f"  Error: {result.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 6: 錯誤處理與重試

問題:MCP客戶端在網路不穩定、伺服器過載或工具執行異常時,需要健壯的錯誤處理和重試機制。

解決方案:實現分層錯誤處理、指數退避重試、熔斷器和降級策略。

錯誤處理架構

┌────────────────────────────────────────────┐
│              Error Handling Layer           │
│                                            │
│  ┌──────────┐  ┌──────────┐  ┌─────────┐  │
│  │ Retry    │  │ Circuit  │  │ Fallback│  │
│  │ Strategy │  │ Breaker  │  │ Handler │  │
│  └──────────┘  └──────────┘  └─────────┘  │
│                                            │
│  ┌──────────────────────────────────────┐  │
│  │        Error Classification          │  │
│  │  Transient / Permanent / Unknown     │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘

完整實現

import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ErrorCategory(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    UNKNOWN = "unknown"


@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_errors: list[int] = field(default_factory=lambda: [
        -32603,  # Internal error
        -32000,  # Server error
    ])


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig | None = None):
        self._config = config or CircuitBreakerConfig()
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = 0
        self._half_open_calls = 0

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
        return self._state

    def allow_request(self) -> bool:
        state = self.state
        if state == CircuitState.CLOSED:
            return True
        if state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self._config.half_open_max_calls:
                self._half_open_calls += 1
                return True
            return False
        return False

    def record_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self._config.half_open_max_calls:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
                self._success_count = 0
        else:
            self._failure_count = 0

    def record_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.OPEN
            self._success_count = 0
        elif self._failure_count >= self._config.failure_threshold:
            self._state = CircuitState.OPEN


class ResilientMcpClient:
    def __init__(
        self,
        client,
        retry_config: RetryConfig | None = None,
        circuit_config: CircuitBreakerConfig | None = None,
    ):
        self._client = client
        self._retry_config = retry_config or RetryConfig()
        self._circuit_breakers: dict[str, CircuitBreaker] = {}
        self._circuit_config = circuit_config or CircuitBreakerConfig()
        self._fallback_handlers: dict[str, Callable] = {}

    def _get_breaker(self, server_name: str) -> CircuitBreaker:
        if server_name not in self._circuit_breakers:
            self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
        return self._circuit_breakers[server_name]

    def register_fallback(self, tool_name: str, handler: Callable):
        self._fallback_handlers[tool_name] = handler

    def classify_error(self, error: Exception) -> ErrorCategory:
        if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
            return ErrorCategory.TRANSIENT
        if isinstance(error, McpError):
            if error.code in self._retry_config.retryable_errors:
                return ErrorCategory.TRANSIENT
            if error.code in [-32600, -32601, -32602]:
                return ErrorCategory.PERMANENT
        return ErrorCategory.UNKNOWN

    async def call_with_retry(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        server_name: str = "default",
    ) -> Any:
        breaker = self._get_breaker(server_name)

        if not breaker.allow_request():
            fallback = self._fallback_handlers.get(tool_name)
            if fallback:
                logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
                return await fallback(arguments)
            raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")

        last_error = None
        for attempt in range(1, self._retry_config.max_attempts + 1):
            try:
                result = await self._client.call_tool(tool_name, arguments)
                breaker.record_success()
                return result
            except Exception as e:
                last_error = e
                category = self.classify_error(e)

                if category == ErrorCategory.PERMANENT:
                    breaker.record_failure()
                    raise

                if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
                    delay = min(
                        self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
                        self._retry_config.max_delay,
                    )
                    if self._retry_config.jitter:
                        import random
                        delay *= (0.5 + random.random() * 0.5)

                    logger.warning(
                        f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
                        f"retrying in {delay:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay)
                else:
                    breaker.record_failure()

        if fallback := self._fallback_handlers.get(tool_name):
            logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
            return await fallback(arguments)

        raise last_error


class CircuitOpenError(Exception):
    pass

使用範例

async def fallback_search(arguments: dict) -> str:
    return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})


async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    resilient = ResilientMcpClient(
        discovery,
        retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
        circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
    )
    resilient.register_fallback("search_docs", fallback_search)

    try:
        result = await resilient.call_with_retry("search_docs", {"query": "MCP協議解析"})
        print(f"Result: {result}")
    except CircuitOpenError as e:
        print(f"Service unavailable: {e}")
    except Exception as e:
        print(f"Failed: {e}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 7: 串流回應處理

問題:長時間執行的工具呼叫需要即時進度回饋,MCP協議支援透過SSE和Notification實現串流回應。

解決方案:實現串流回應處理器,支援進度通知、部分結果聚合和取消操作。

串流回應架構

┌──────────┐  tools/call   ┌──────────┐
│  Client  │ ────────────> │  Server  │
│          │               │          │
│  Stream  │ <── notify ── │  Tool    │
│  Handler │   progress    │  Running │
│          │ <── notify ── │          │
│          │   progress    │          │
│          │ <── result ── │          │
│          │   complete    │          │
└──────────┘               └──────────┘

完整實現

import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum

logger = logging.getLogger(__name__)


class StreamEventType(Enum):
    PROGRESS = "progress"
    PARTIAL_RESULT = "partial_result"
    COMPLETE = "complete"
    ERROR = "error"
    LOG = "log"


@dataclass
class StreamEvent:
    type: StreamEventType
    data: Any
    progress: float | None = None
    message: str | None = None
    timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())


@dataclass
class StreamResult:
    events: list[StreamEvent] = field(default_factory=list)
    final_result: Any = None
    success: bool = True
    error: str | None = None
    total_duration_ms: float = 0.0


class StreamHandler:
    def __init__(self):
        self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
        self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
        self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
        self._error_handlers: list[Callable[[StreamEvent], Any]] = []
        self._log_handlers: list[Callable[[StreamEvent], Any]] = []

    def on_progress(self, handler: Callable[[StreamEvent], Any]):
        self._progress_handlers.append(handler)

    def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
        self._partial_handlers.append(handler)

    def on_complete(self, handler: Callable[[StreamEvent], Any]):
        self._complete_handlers.append(handler)

    def on_error(self, handler: Callable[[StreamEvent], Any]):
        self._error_handlers.append(handler)

    def on_log(self, handler: Callable[[StreamEvent], Any]):
        self._log_handlers.append(handler)

    async def handle_notification(self, params: dict[str, Any]):
        method = params.get("method", "")

        if method == "notifications/progress":
            progress_data = params.get("progress", {})
            event = StreamEvent(
                type=StreamEventType.PROGRESS,
                data=progress_data,
                progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
                message=progress_data.get("message"),
            )
            for handler in self._progress_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Progress handler error: {e}")

        elif method == "notifications/message":
            level = params.get("level", "info")
            event = StreamEvent(
                type=StreamEventType.LOG,
                data=params.get("data"),
                message=f"[{level}] {params.get('data', {}).get('text', '')}",
            )
            for handler in self._log_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Log handler error: {e}")


class StreamingMcpClient:
    def __init__(self, client, discovery: ToolDiscovery):
        self._client = client
        self._discovery = discovery
        self._stream_handler = StreamHandler()
        self._active_calls: dict[str, asyncio.Task] = {}

        client.on_notification(
            "notifications/progress",
            self._stream_handler.handle_notification,
        )
        client.on_notification(
            "notifications/message",
            self._stream_handler.handle_notification,
        )

    @property
    def stream(self) -> StreamHandler:
        return self._stream_handler

    async def call_with_stream(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        timeout: float = 120.0,
    ) -> StreamResult:
        import time
        start = time.monotonic()
        result = StreamResult()

        self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
        self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))

        try:
            call_result = await asyncio.wait_for(
                self._discovery.call_tool(tool_name, arguments),
                timeout=timeout,
            )
            result.final_result = call_result
            result.success = True
            result.events.append(StreamEvent(
                type=StreamEventType.COMPLETE,
                data=call_result,
            ))
        except asyncio.TimeoutError:
            result.success = False
            result.error = f"Tool call timed out after {timeout}s"
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=result.error,
            ))
        except Exception as e:
            result.success = False
            result.error = str(e)
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=str(e),
            ))

        result.total_duration_ms = (time.monotonic() - start) * 1000
        return result

    async def cancel(self, request_id: int | str):
        await self._client.send_notification("notifications/cancelled", {
            "requestId": request_id,
            "reason": "User cancelled",
        })

使用範例

async def main():
    client = JsonRpcClient(timeout=120.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    streaming = StreamingMcpClient(client, discovery)

    streaming.stream.on_progress(lambda e: print(f"  Progress: {e.progress:.0%} - {e.message or ''}"))
    streaming.stream.on_log(lambda e: print(f"  Log: {e.message}"))
    streaming.stream.on_complete(lambda e: print(f"  Complete!"))
    streaming.stream.on_error(lambda e: print(f"  Error: {e.message}"))

    print("Calling long-running tool with streaming...")
    result = await streaming.call_with_stream(
        "batch_process",
        {"items": list(range(100)), "operation": "analyze"},
        timeout=120.0,
    )

    print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
    if result.final_result:
        print(f"Output: {result.final_result[:200]}...")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

5個常見坑及解決方案

1. 初始化握手遺漏initialized通知

:發送initialize請求後忘記發送notifications/initialized通知,導致Server一直等待握手完成,後續請求全部超時。

解決方案

async def safe_initialize(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {},
        "clientInfo": {"name": "my-client", "version": "1.0.0"},
    })
    # 必須發送initialized通知
    await client.send_notification("notifications/initialized")
    return result

2. SSE連線endpoint未等待就發送請求

:SSE連線建立後立即發送JSON-RPC請求,但還沒收到endpoint事件,導致請求發到了錯誤的URL。

解決方案

class SafeSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._endpoint_ready = asyncio.Event()
        self._transport.on_event("endpoint", self._on_endpoint)

    async def _on_endpoint(self, data):
        self._endpoint_ready.set()

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
        # 現在可以安全發送請求
        ...

3. 工具schema轉換遺失required欄位

:將MCP工具schema轉換為OpenAI function calling格式時,inputSchema直接作為parameters使用,但某些LLM要求required欄位必須存在。

解決方案

def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
    schema = dict(mcp_tool.input_schema)
    if "required" not in schema:
        schema["required"] = []
    if "type" not in schema:
        schema["type"] = "object"
    return {
        "type": "function",
        "function": {
            "name": mcp_tool.name,
            "description": mcp_tool.description,
            "parameters": schema,
        },
    }

4. 並行呼叫時共享狀態競態

:多個並行工具呼叫共享同一個httpx.AsyncClientasyncio.Future字典,導致回應錯配。

解決方案:每個並行呼叫使用獨立的請求ID空間,或使用連線池隔離:

class IsolatedClientPool:
    def __init__(self, base_url: str, pool_size: int = 5):
        self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
        self._index = 0
        self._lock = asyncio.Lock()

    async def get_client(self) -> McpSseClient:
        async with self._lock:
            client = self._clients[self._index % len(self._clients)]
            self._index += 1
            return client

5. 串流回應中進度通知與結果混淆

:SSE通道中進度通知和最終結果使用同一個串流,客戶端把進度通知當作最終結果處理。

解決方案:嚴格區分訊息類型,只處理帶id欄位的回應作為請求結果:

async def _handle_sse_message(self, message: dict):
    if "id" in message:
        # 這是請求回應,匹配pending future
        future = self._pending.pop(message["id"], None)
        if future and not future.done():
            future.set_result(message.get("result"))
    elif "method" in message:
        # 這是通知,交給stream handler
        await self._stream_handler.handle_notification(message)

10個常見報錯排查

報錯資訊 原因 解決方法
MCP Error [-32600]: Invalid Request JSON-RPC訊息格式錯誤,缺少必填欄位 檢查請求是否包含jsonrpcmethodid欄位,使用 /zh-TW/json/format 驗證格式
MCP Error [-32601]: Method not found 呼叫了Server不支援的方法 先呼叫initialize確認Server支援的能力,檢查方法名拼寫
MCP Error [-32602]: Invalid params 工具參數類型或結構不匹配 tools/list取得正確的inputSchema,對照檢查參數
Connection refused MCP Server未啟動或連接埠錯誤 確認Server處理序執行中,檢查連接埠和URL設定
SSE connection timeout 網路不通或Server未回應SSE握手 檢查防火牆規則,確認SSE endpoint可達
Request timed out after 30s 工具執行時間超過客戶端超時設定 增大timeout參數,或使用串流回應取得進度
Circuit breaker open 連續失敗次數超過熔斷閾值 檢查Server健康狀態,等待恢復超時後重試
Tool not found: xxx 呼叫了未註冊的工具名 重新執行tools/list發現,檢查工具名大小寫
Endpoint not received SSE連線後未收到endpoint事件 確認Server正確發送endpoint事件,檢查SSE路由設定
JSON decode error in SSE stream SSE資料不是有效JSON 檢查Server端SSE事件格式,確保data欄位是合法JSON

進階優化技巧

1. 連線池與多Server管理

生產環境中,AI Agent通常需要同時連接多個MCP Server。使用連線池管理多個客戶端實例:

class McpConnectionPool:
    def __init__(self):
        self._connections: dict[str, Any] = {}
        self._discovery_map: dict[str, ToolDiscovery] = {}

    async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
        client = JsonRpcClient(timeout=30.0)
        await client.connect_stdio(command, args)
        await initialize_client(client)
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    async def connect_sse(self, name: str, url: str):
        client = McpSseClient(url)
        await client.connect()
        await client.initialize()
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    def get_discovery(self, name: str) -> ToolDiscovery | None:
        return self._discovery_map.get(name)

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        for name, discovery in self._discovery_map.items():
            if discovery.registry.get(tool_name):
                return await discovery.call_tool(tool_name, arguments)
        raise ValueError(f"Tool '{tool_name}' not found in any connected server")

    async def close_all(self):
        for client in self._connections.values():
            await client.close()

2. 工具呼叫快取

對冪等工具呼叫結果進行快取,減少重複請求:

import hashlib
import json
from typing import Any


class ToolCallCache:
    def __init__(self, ttl: float = 300.0, max_size: int = 1000):
        self._cache: dict[str, tuple[float, Any]] = {}
        self._ttl = ttl
        self._max_size = max_size

    def _make_key(self, tool_name: str, arguments: dict) -> str:
        raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()

    def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
        key = self._make_key(tool_name, arguments)
        if key in self._cache:
            ts, value = self._cache[key]
            if asyncio.get_event_loop().time() - ts < self._ttl:
                return True, value
            del self._cache[key]
        return False, None

    def set(self, tool_name: str, arguments: dict, value: Any):
        if len(self._cache) >= self._max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]
        key = self._make_key(tool_name, arguments)
        self._cache[key] = (asyncio.get_event_loop().time(), value)

    def invalidate(self, tool_name: str, arguments: dict | None = None):
        if arguments:
            key = self._make_key(tool_name, arguments)
            self._cache.pop(key, None)
        else:
            prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
            keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
            for k in keys_to_remove:
                del self._cache[k]

3. 可觀測性與追蹤

為MCP客戶端呼叫新增OpenTelemetry追蹤:

from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")

tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")


class ObservableMcpClient:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
            span.set_attribute("mcp.tool.name", tool_name)
            span.set_attribute("mcp.tool.args_count", len(arguments))

            import time
            start = time.monotonic()
            try:
                result = await self._discovery.call_tool(tool_name, arguments)
                duration = time.monotonic() - start

                span.set_status(Status(StatusCode.OK))
                tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
                tool_call_duration.record(duration, {"tool": tool_name})

                return result
            except Exception as e:
                duration = time.monotonic() - start
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
                tool_call_duration.record(duration, {"tool": tool_name})
                raise

對比分析:3種編排方案

維度 串行編排 並行編排 DAG編排
執行方式 順序執行,前一步完成後執行下一步 所有任務同時執行 按依賴關係拓撲排序執行
適用場景 工具間有嚴格資料依賴 工具間完全獨立 工具間部分有依賴、部分獨立
延遲 所有工具耗時之和 最慢工具的耗時 關鍵路徑上的耗時之和
錯誤傳播 前一步失敗則後續全部跳過 單個失敗不影響其他 失敗節點的下游被跳過
實現複雜度
資源消耗 低(同一時間只用一個連線) 高(需要並發連線池) 中(按層級並發)
結果一致性 強(順序確定) 弱(完成順序不確定) 中(同層級順序不確定)
超時控制 每步獨立超時 全域超時+單任務超時 每步獨立+全域超時
典型場景 搜尋→擷取→翻譯 同時搜尋3個資料來源 搜尋A+B→合併→翻譯

選擇建議

  • 工具間有嚴格因果依賴 → 串行編排
  • 工具間完全獨立 → 並行編排
  • 工具間有部分依賴 → DAG編排

更多AI Agent工作流編排內容,推薦閱讀 AI Agent工作流DAG編排AI Agent協議對比


線上工具推薦

  • JSON格式化:除錯MCP協議訊息時,使用 /zh-TW/json/format 格式化JSON-RPC請求和回應
  • cURL轉程式碼:測試MCP Server介面時,使用 /zh-TW/dev/curl-to-code 快速產生Python客戶端程式碼
  • Base64編解碼:處理MCP傳輸中的二進位資料時,使用 /zh-TW/encode/base64 編解碼

總結:Python MCP客戶端整合遠不止發送JSON-RPC請求那麼簡單。從基礎的stdio連線到SSE遠端傳輸,從工具發現與動態註冊到多工具串行/並行/DAG編排,從錯誤處理與熔斷降級到串流回應與進度追蹤——每一個環節都需要生產級的工程實踐。掌握這7種模式,你的AI Agent將具備可靠、高效、可觀測的工具呼叫能力。記住:MCP協議解析是基礎,多工具編排是核心,錯誤處理是保障,串流回應是體驗。參考MCP官方規範 Model Context Protocol Specification 取得最新協議定義。

本站提供瀏覽器本地工具,免註冊即可試用 →

#Python#MCP#Model Context Protocol#AI工具编排#函数调用#多工具集成#2026#AI与大数据