Python MCP客戶端整合實戰:從協議解析到多工具編排的7種生產模式
為什麼你的AI應用總是無法連接外部工具?MCP客戶端整合的5大痛點
你花了三天寫了一個完美的MCP Server,結果Claude Desktop連不上?你的AI Agent呼叫三個工具,第一個超時後面全掛?工具發現回傳了一堆schema,你不知道怎麼轉成LLM能用的function calling格式?SSE連線總是斷開重連,資料丟了還不知道?多工具編排時串行太慢並行又亂套?
這些問題的根源在於:大多數開發者只關注了MCP Server的開發,卻忽略了Client端的整合複雜度。MCP協議雖然定義了標準通訊方式,但客戶端需要處理JSON-RPC訊息建構、傳輸層適配、工具發現與轉換、編排策略、錯誤恢復等一整套工程問題。
核心要點:
- 掌握MCP協議JSON-RPC 2.0通訊機制,實現可靠的客戶端-伺服器連線
- 學會工具發現與動態註冊,讓AI Agent自動適配新工具
- 理解多工具編排的7種生產模式,從串行呼叫到並行DAG執行
- 建立完善的錯誤處理與重試機制,應對網路抖動和服務不可用
- 掌握SSE串流回應處理,實現即時工具呼叫進度回饋
目錄導航
- MCP協議核心概念
- Pattern 1: 基礎JSON-RPC客戶端
- Pattern 2: SSE傳輸層實現
- Pattern 3: 工具發現與動態註冊
- Pattern 4: 多工具串行編排
- Pattern 5: 多工具並行編排
- Pattern 6: 錯誤處理與重試
- Pattern 7: 串流回應處理
- 5個常見坑及解決方案
- 10個常見報錯排查
- 進階優化技巧
- 對比分析:3種編排方案
MCP協議核心概念
在深入7種生產模式之前,我們需要理解MCP客戶端在協議棧中的位置和核心概念。
MCP客戶端架構總覽
┌─────────────────────────────────────────────────┐
│ AI Application │
│ (Claude Desktop / ChatGPT / 自定義Agent) │
├─────────────────────────────────────────────────┤
│ MCP Client Layer │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ JSON-RPC │ │Transport │ │ Tool Registry │ │
│ │ Handler │ │ Adapter │ │ & Discovery │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ Orchest- │ │ Error & │ │ Stream │ │
│ │ ration │ │ Retry │ │ Handler │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
├─────────────────────────────────────────────────┤
│ Transport Layer │
│ stdio / SSE / Streamable HTTP │
├─────────────────────────────────────────────────┤
│ MCP Server │
│ Tools / Resources / Prompts │
└─────────────────────────────────────────────────┘
核心概念對照表
| 概念 | 說明 | JSON-RPC方法 | 類比 |
|---|---|---|---|
| Initialize | 客戶端與伺服器握手 | initialize |
TCP三次握手 |
| Tool Discovery | 取得伺服器支援的工具列表 | tools/list |
DNS查詢 |
| Tool Call | 呼叫指定工具並取得結果 | tools/call |
HTTP POST |
| Resource Read | 讀取伺服器暴露的資源 | resources/read |
HTTP GET |
| Prompt Get | 取得預定義提示模板 | prompts/get |
模板引擎 |
| Notification | 單向通知,無需回應 | 無id欄位 | UDP廣播 |
| Cancel | 取消正在進行的請求 | notifications/cancelled |
HTTP取消 |
JSON-RPC 2.0訊息格式
MCP協議基於JSON-RPC 2.0,理解訊息格式是客戶端開發的基礎:
// 請求訊息
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "search_docs",
"arguments": {"query": "MCP協議"}
},
"id": 1
}
// 成功回應
{
"jsonrpc": "2.0",
"result": {
"content": [
{"type": "text", "text": "搜尋結果..."}
]
},
"id": 1
}
// 錯誤回應
{
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request"
},
"id": 1
}
如果你對MCP Server開發感興趣,可以參考我們之前的文章:Python MCP Server開發實戰。如果你需要了解更底層的AI函式呼叫協議,推薦閱讀 AI函式呼叫生產實踐。
Pattern 1: 基礎JSON-RPC客戶端
問題:你的AI應用需要連接MCP Server,但不知道如何建構正確的JSON-RPC訊息、管理請求ID、處理回應。
解決方案:建構一個完整的JSON-RPC 2.0客戶端,封裝訊息建構、發送、回應匹配和超時處理。
完整實現
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass
class JsonRpcRequest:
jsonrpc: str = "2.0"
method: str = ""
params: dict[str, Any] = field(default_factory=dict)
id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)
@dataclass
class JsonRpcResponse:
id: int | str
result: Any = None
error: dict[str, Any] | None = None
class JsonRpcClient:
def __init__(self, timeout: float = 30.0):
self._request_id = 0
self._pending: dict[int | str, asyncio.Future] = {}
self._timeout = timeout
self._notification_handlers: dict[str, Callable] = {}
self._writer: asyncio.StreamWriter | None = None
self._reader: asyncio.StreamReader | None = None
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
import os
proc_env = {**os.environ, **(env or {})}
self._process = await asyncio.create_subprocess_exec(
command,
*(args or []),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=proc_env,
)
self._reader = self._process.stdout
self._writer = self._process.stdin
asyncio.create_task(self._read_loop())
async def _read_loop(self):
while self._reader and not self._reader.at_eof():
line = await self._reader.readline()
if not line:
continue
try:
message = json.loads(line.decode().strip())
await self._handle_message(message)
except json.JSONDecodeError:
continue
async def _handle_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(
McpError(message["error"]["code"], message["error"]["message"])
)
else:
future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
elif "method" in message:
handler = self._notification_handlers.get(message["method"])
if handler:
await handler(message.get("params", {}))
async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
request_id = self._next_id()
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[request_id] = future
data = json.dumps(request) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
try:
response = await asyncio.wait_for(future, timeout=self._timeout)
return response.result
except asyncio.TimeoutError:
self._pending.pop(request_id, None)
raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")
async def send_notification(self, method: str, params: dict[str, Any] | None = None):
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
data = json.dumps(notification) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
def on_notification(self, method: str, handler: Callable):
self._notification_handlers[method] = handler
async def close(self):
if self._writer:
self._writer.close()
if hasattr(self, "_process") and self._process:
self._process.terminate()
await self._process.wait()
class McpError(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"MCP Error [{code}]: {message}")
class McpTimeoutError(Exception):
pass
初始化握手
async def initialize_client(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {},
},
"clientInfo": {
"name": "toolsku-mcp-client",
"version": "1.0.0",
},
})
await client.send_notification("notifications/initialized")
return result
使用範例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
server_info = await initialize_client(client)
print(f"Connected to: {server_info['serverInfo']['name']}")
tools = await client.send_request("tools/list")
for tool in tools.get("tools", []):
print(f" - {tool['name']}: {tool['description']}")
result = await client.send_request("tools/call", {
"name": "search_docs",
"arguments": {"query": "MCP協議解析"},
})
print(f"Result: {result}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 2: SSE傳輸層實現
問題:stdio傳輸僅適用於本機處理序通訊,生產環境需要透過網路連接遠端MCP Server,SSE(Server-Sent Events)是MCP協議支援的遠端傳輸方式。
解決方案:實現完整的SSE傳輸層,包括連線管理、事件解析、訊息路由和自動重連。
SSE傳輸架構
┌──────────────┐ POST /messages ┌──────────────┐
│ MCP Client │ ──────────────────────> │ MCP Server │
│ │ │ │
│ SSE Handler │ <──── GET /sse ──────── │ SSE Endpoint│
│ │ EventStream │ │
└──────────────┘ └──────────────┘
│ │
│ 1. GET /sse 建立SSE連線 │
│ 2. 接收 endpoint 事件取得POST URL │
│ 3. POST /messages 發送JSON-RPC請求 │
│ 4. 透過SSE接收JSON-RPC回應 │
└──────────────────────────────────────────┘
完整實現
import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx
logger = logging.getLogger(__name__)
@dataclass
class SseEvent:
event: str = "message"
data: str = ""
id: str | None = None
retry: int | None = None
class SseTransport:
def __init__(
self,
server_url: str,
reconnect_interval: float = 5.0,
max_reconnect_attempts: int = 10,
headers: dict[str, str] | None = None,
):
self._server_url = server_url.rstrip("/")
self._reconnect_interval = reconnect_interval
self._max_reconnect_attempts = max_reconnect_attempts
self._headers = headers or {}
self._message_endpoint: str | None = None
self._session: httpx.AsyncClient | None = None
self._running = False
self._reconnect_count = 0
self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
self._last_event_id: str | None = None
async def connect(self):
self._session = httpx.AsyncClient(
headers={**self._headers, "Accept": "text/event-stream"},
timeout=httpx.Timeout(30.0, read=300.0),
)
self._running = True
asyncio.create_task(self._sse_listener())
async def _sse_listener(self):
while self._running:
try:
sse_url = f"{self._server_url}/sse"
params = {}
if self._last_event_id:
params["Last-Event-ID"] = self._last_event_id
async with self._session.stream("GET", sse_url, params=params) as response:
response.raise_for_status()
self._reconnect_count = 0
current_event = SseEvent()
async for line in response.aiter_lines():
if not self._running:
break
if line.startswith(":"):
continue
if line == "":
if current_event.data:
await self._dispatch_event(current_event)
current_event = SseEvent()
continue
if ":" in line:
field_name, _, field_value = line.partition(":")
field_value = field_value.lstrip(" ")
if field_name == "event":
current_event.event = field_value
elif field_name == "data":
if current_event.data:
current_event.data += "\n"
current_event.data += field_value
elif field_name == "id":
current_event.id = field_value
self._last_event_id = field_value
elif field_name == "retry":
try:
current_event.retry = int(field_value)
except ValueError:
pass
except httpx.HTTPStatusError as e:
logger.error(f"SSE HTTP error: {e.response.status_code}")
except httpx.RequestError as e:
logger.error(f"SSE connection error: {e}")
except Exception as e:
logger.error(f"SSE unexpected error: {e}")
if self._running:
self._reconnect_count += 1
if self._reconnect_count > self._max_reconnect_attempts:
logger.error("Max reconnect attempts reached, stopping")
self._running = False
break
wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
await asyncio.sleep(wait)
async def _dispatch_event(self, event: SseEvent):
if event.event == "endpoint":
self._message_endpoint = f"{self._server_url}{event.data}"
logger.info(f"Message endpoint: {self._message_endpoint}")
handlers = self._event_handlers.get(event.event, [])
for handler in handlers:
try:
if event.event == "endpoint":
await handler(event.data)
else:
message = json.loads(event.data)
await handler(message)
except Exception as e:
logger.error(f"Event handler error: {e}")
async def send_message(self, message: dict[str, Any]) -> None:
if not self._message_endpoint:
raise RuntimeError("SSE transport not initialized, endpoint not received")
if not self._session:
raise RuntimeError("SSE session not connected")
response = await self._session.post(
self._message_endpoint,
json=message,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
def on_event(self, event_type: str, handler: Callable):
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
async def close(self):
self._running = False
if self._session:
await self._session.aclose()
整合使用
class McpSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._request_id = 0
self._pending: dict[int, asyncio.Future] = {}
self._transport.on_event("message", self._on_message)
async def connect(self):
await self._transport.connect()
async def _on_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
else:
future.set_result(message.get("result"))
async def send_request(self, method: str, params: dict | None = None) -> Any:
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self._request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[self._request_id] = future
await self._transport.send_message(request)
try:
return await asyncio.wait_for(future, timeout=60.0)
except asyncio.TimeoutError:
self._pending.pop(self._request_id, None)
raise McpTimeoutError(f"Request {method} timed out")
async def initialize(self) -> dict:
result = await self.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
})
await self._transport.send_message({
"jsonrpc": "2.0",
"method": "notifications/initialized",
})
return result
async def close(self):
await self._transport.close()
async def main():
client = McpSseClient("http://localhost:8080")
await client.connect()
info = await client.initialize()
print(f"Connected: {info}")
tools = await client.send_request("tools/list")
print(f"Available tools: {tools}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 3: 工具發現與動態註冊
問題:MCP Server的工具列表是動態的,客戶端需要自動發現工具並將其轉換為LLM可呼叫的function calling格式,同時支援工具變更通知。
解決方案:實現工具發現、schema轉換、動態註冊和變更監聽。
工具發現架構
┌──────────────┐ tools/list ┌──────────────┐
│ MCP Client │ ────────────> │ MCP Server │
│ │ <──────────── │ │
│ Tool │ Tool Schema │ Tool │
│ Registry │ │ Definitions │
│ │ tools/ │ │
│ ┌────────┐ │ list_changed │ │
│ │ OpenAI │ │ <──────────── │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Claude │ │ │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Gemini │ │ │ │
│ │ Format │ │ │ │
│ └────────┘ │ │ │
└──────────────┘ └──────────────┘
完整實現
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class McpTool:
name: str
description: str
input_schema: dict[str, Any]
annotations: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mcp(cls, data: dict) -> "McpTool":
return cls(
name=data["name"],
description=data.get("description", ""),
input_schema=data.get("inputSchema", {}),
annotations=data.get("annotations", {}),
)
class ToolRegistry:
def __init__(self):
self._tools: dict[str, McpTool] = {}
self._change_handlers: list[Callable] = []
def register(self, tool: McpTool):
self._tools[tool.name] = tool
logger.info(f"Registered tool: {tool.name}")
self._notify_change("registered", tool)
def unregister(self, name: str):
if name in self._tools:
tool = self._tools.pop(name)
logger.info(f"Unregistered tool: {name}")
self._notify_change("unregistered", tool)
def get(self, name: str) -> McpTool | None:
return self._tools.get(name)
def list_tools(self) -> list[McpTool]:
return list(self._tools.values())
def on_change(self, handler: Callable):
self._change_handlers.append(handler)
def _notify_change(self, action: str, tool: McpTool):
for handler in self._change_handlers:
try:
handler(action, tool)
except Exception as e:
logger.error(f"Change handler error: {e}")
def to_openai_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
},
})
return result
def to_claude_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"name": tool.name,
"description": tool.description,
"input_schema": tool.input_schema,
})
return result
def to_gemini_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
params = tool.input_schema.get("properties", {})
required = tool.input_schema.get("required", [])
declarations = []
for param_name, param_schema in params.items():
declarations.append({
"name": param_name,
"description": param_schema.get("description", ""),
"type": self._json_type_to_gemini(param_schema.get("type", "string")),
"required": param_name in required,
})
result.append({
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "OBJECT",
"properties": {d["name"]: {"type": d["type"]} for d in declarations},
"required": [d["name"] for d in declarations if d["required"]],
},
})
return result
@staticmethod
def _json_type_to_gemini(json_type: str) -> str:
mapping = {
"string": "STRING",
"integer": "INTEGER",
"number": "NUMBER",
"boolean": "BOOLEAN",
"array": "ARRAY",
"object": "OBJECT",
}
return mapping.get(json_type, "STRING")
class ToolDiscovery:
def __init__(self, client):
self._client = client
self._registry = ToolRegistry()
self._client.on_notification(
"notifications/tools/list_changed",
self._on_tools_changed,
)
@property
def registry(self) -> ToolRegistry:
return self._registry
async def discover(self) -> list[McpTool]:
result = await self._client.send_request("tools/list")
tools = []
for tool_data in result.get("tools", []):
tool = McpTool.from_mcp(tool_data)
self._registry.register(tool)
tools.append(tool)
logger.info(f"Discovered {len(tools)} tools")
return tools
async def _on_tools_changed(self, params: dict):
logger.info("Tools list changed, re-discovering...")
self._registry._tools.clear()
await self.discover()
async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
tool = self._registry.get(name)
if not tool:
raise ValueError(f"Tool not found: {name}")
result = await self._client.send_request("tools/call", {
"name": name,
"arguments": arguments,
})
contents = result.get("content", [])
text_parts = []
for content in contents:
if content.get("type") == "text":
text_parts.append(content["text"])
elif content.get("type") == "image":
text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
elif content.get("type") == "resource":
text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")
return "\n".join(text_parts) if text_parts else str(result)
動態註冊使用
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
tools = await discovery.discover()
print(f"Discovered {len(tools)} tools:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
openai_tools = discovery.registry.to_openai_tools()
print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2, ensure_ascii=False)}")
claude_tools = discovery.registry.to_claude_tools()
print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2, ensure_ascii=False)}")
result = await discovery.call_tool("search_docs", {"query": "MCP協議解析"})
print(f"\nTool result: {result}")
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Pattern 4: 多工具串行編排
問題:AI Agent需要依序呼叫多個工具,後一個工具的輸入依賴前一個工具的輸出,如何優雅地管理這種串行依賴鏈?
解決方案:實現串行編排器,支援依賴注入、中間結果傳遞和條件分支。
串行編排流程
使用者輸入: "搜尋MCP文件並翻譯摘要"
│
▼
┌──────────┐ result ┌──────────┐ result ┌──────────┐
│ Tool 1 │ ────────────> │ Tool 2 │ ────────────> │ Tool 3 │
│ 搜尋文件 │ │ 擷取摘要 │ │ 翻譯內容 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ $search_result │ $summary_text │ $translated
▼ ▼ ▼
Context Store Context Store Context Store
完整實現
import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class StepResult:
step_name: str
tool_name: str
input_args: dict[str, Any]
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
@dataclass
class PipelineStep:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
condition: Callable[[dict[str, Any]], bool] | None = None
timeout: float = 30.0
retry_count: int = 0
def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
if callable(self.arguments):
return self.arguments(context)
resolved = {}
for key, value in self.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
def _deep_get(data: dict, path: str) -> Any:
keys = path.split(".")
current = data
for key in keys:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, list) and key.isdigit():
idx = int(key)
current = current[idx] if idx < len(current) else None
else:
return None
return current
class SequentialOrchestrator:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
self._steps: list[PipelineStep] = []
self._context: dict[str, Any] = {}
def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
self._steps.append(step)
return self
def set_context(self, key: str, value: Any):
self._context[key] = value
async def execute(self) -> list[StepResult]:
results = []
context = dict(self._context)
for step in self._steps:
if step.condition and not step.condition(context):
logger.info(f"Skipping step '{step.name}': condition not met")
continue
resolved_args = step.resolve_arguments(context)
logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= step.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(step.tool_name, resolved_args),
timeout=step.timeout,
)
duration = (time.monotonic() - start) * 1000
context[f"steps.{step.name}"] = output
context["last_result"] = output
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=output,
success=True,
duration_ms=duration,
))
break
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= step.retry_count:
logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
await asyncio.sleep(1.0 * attempt)
else:
duration = (time.monotonic() - start) * 1000
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=None,
success=False,
error=last_error,
duration_ms=duration,
))
break
return results
使用範例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = SequentialOrchestrator(discovery)
orchestrator.set_context("user_query", "MCP協議解析與Python客戶端整合")
orchestrator.add_step(PipelineStep(
name="search",
tool_name="search_docs",
arguments={"query": "$user_query", "limit": 5},
))
orchestrator.add_step(PipelineStep(
name="extract",
tool_name="extract_summary",
arguments={"text": "$steps.search"},
condition=lambda ctx: bool(ctx.get("steps.search")),
))
orchestrator.add_step(PipelineStep(
name="translate",
tool_name="translate_text",
arguments={"text": "$steps.extract", "target_lang": "en"},
condition=lambda ctx: bool(ctx.get("steps.extract")),
))
results = await orchestrator.execute()
for r in results:
status = "✓" if r.success else "✗"
print(f"{status} [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
if r.error:
print(f" Error: {r.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 5: 多工具並行編排
問題:當多個工具呼叫之間沒有依賴關係時,串行執行浪費時間。如何安全地並行呼叫多個MCP工具並彙總結果?
解決方案:基於asyncio實現並行編排器,支援並發控制、結果聚合和DAG依賴執行。
並行編排架構
┌──────────┐
│ 使用者請求│
└────┬─────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Tool A │ │ Tool B │ │ Tool C │
│ 搜尋文件 │ │ 查詢資料庫│ │ 呼叫API │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────┬──┘────────────┘
│
▼
┌─────────────┐
│ 結果聚合 │
└─────────────┘
完整實現
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class ParallelTask:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
depends_on: list[str] = field(default_factory=list)
timeout: float = 30.0
retry_count: int = 0
@dataclass
class TaskResult:
name: str
tool_name: str
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
class ParallelOrchestrator:
def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
self._discovery = discovery
self._max_concurrency = max_concurrency
self._tasks: list[ParallelTask] = []
def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
self._tasks.append(task)
return self
async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
context = dict(initial_context or {})
results: dict[str, TaskResult] = {}
semaphore = asyncio.Semaphore(self._max_concurrency)
completed = set()
remaining = list(self._tasks)
while remaining:
ready = [
t for t in remaining
if all(dep in completed for dep in t.depends_on)
]
if not ready and remaining:
logger.error("Circular dependency detected")
break
coros = [self._execute_task(task, context, results, semaphore) for task in ready]
task_results = await asyncio.gather(*coros, return_exceptions=True)
for task, result in zip(ready, task_results):
if isinstance(result, Exception):
results[task.name] = TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=str(result),
)
else:
results[task.name] = result
context[f"tasks.{task.name}"] = result.output
completed.add(task.name)
remaining.remove(task)
return results
async def _execute_task(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
semaphore: asyncio.Semaphore,
) -> TaskResult:
async with semaphore:
resolved_args = self._resolve_arguments(task, context, results)
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= task.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(task.tool_name, resolved_args),
timeout=task.timeout,
)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=output,
success=True,
duration_ms=duration,
)
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= task.retry_count:
await asyncio.sleep(0.5 * attempt)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=last_error,
duration_ms=duration,
)
def _resolve_arguments(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
) -> dict[str, Any]:
if callable(task.arguments):
return task.arguments(context)
resolved = {}
for key, value in task.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
if ref_path.startswith("tasks."):
task_name = ref_path.split(".")[1]
if task_name in results:
resolved[key] = results[task_name].output
else:
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
class DagOrchestrator(ParallelOrchestrator):
def validate_dag(self) -> list[str]:
errors = []
task_names = {t.name for t in self._tasks}
for task in self._tasks:
for dep in task.depends_on:
if dep not in task_names:
errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")
visited = set()
path = set()
def has_cycle(name: str) -> bool:
visited.add(name)
path.add(name)
task = next((t for t in self._tasks if t.name == name), None)
if task:
for dep in task.depends_on:
if dep in path:
return True
if dep not in visited and has_cycle(dep):
return True
path.remove(name)
return False
for task in self._tasks:
if task.name not in visited:
if has_cycle(task.name):
errors.append("Circular dependency detected in DAG")
break
return errors
使用範例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)
orchestrator.add_task(ParallelTask(
name="search_web",
tool_name="web_search",
arguments={"query": "MCP協議解析 2026"},
))
orchestrator.add_task(ParallelTask(
name="search_docs",
tool_name="search_docs",
arguments={"query": "Model Context Protocol Python"},
))
orchestrator.add_task(ParallelTask(
name="query_db",
tool_name="query_database",
arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
))
orchestrator.add_task(ParallelTask(
name="merge_results",
tool_name="merge_and_deduplicate",
arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
depends_on=["search_web", "search_docs", "query_db"],
))
results = await orchestrator.execute({"user_query": "MCP客戶端整合"})
for name, result in results.items():
status = "✓" if result.success else "✗"
print(f"{status} [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
if result.error:
print(f" Error: {result.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 6: 錯誤處理與重試
問題:MCP客戶端在網路不穩定、伺服器過載或工具執行異常時,需要健壯的錯誤處理和重試機制。
解決方案:實現分層錯誤處理、指數退避重試、熔斷器和降級策略。
錯誤處理架構
┌────────────────────────────────────────────┐
│ Error Handling Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Retry │ │ Circuit │ │ Fallback│ │
│ │ Strategy │ │ Breaker │ │ Handler │ │
│ └──────────┘ └──────────┘ └─────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Error Classification │ │
│ │ Transient / Permanent / Unknown │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
完整實現
import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
class ErrorCategory(Enum):
TRANSIENT = "transient"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: list[int] = field(default_factory=lambda: [
-32603, # Internal error
-32000, # Server error
])
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig | None = None):
self._config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float = 0
self._half_open_calls = 0
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def allow_request(self) -> bool:
state = self.state
if state == CircuitState.CLOSED:
return True
if state == CircuitState.HALF_OPEN:
if self._half_open_calls < self._config.half_open_max_calls:
self._half_open_calls += 1
return True
return False
return False
def record_success(self):
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self._config.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
else:
self._failure_count = 0
def record_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._success_count = 0
elif self._failure_count >= self._config.failure_threshold:
self._state = CircuitState.OPEN
class ResilientMcpClient:
def __init__(
self,
client,
retry_config: RetryConfig | None = None,
circuit_config: CircuitBreakerConfig | None = None,
):
self._client = client
self._retry_config = retry_config or RetryConfig()
self._circuit_breakers: dict[str, CircuitBreaker] = {}
self._circuit_config = circuit_config or CircuitBreakerConfig()
self._fallback_handlers: dict[str, Callable] = {}
def _get_breaker(self, server_name: str) -> CircuitBreaker:
if server_name not in self._circuit_breakers:
self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
return self._circuit_breakers[server_name]
def register_fallback(self, tool_name: str, handler: Callable):
self._fallback_handlers[tool_name] = handler
def classify_error(self, error: Exception) -> ErrorCategory:
if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
return ErrorCategory.TRANSIENT
if isinstance(error, McpError):
if error.code in self._retry_config.retryable_errors:
return ErrorCategory.TRANSIENT
if error.code in [-32600, -32601, -32602]:
return ErrorCategory.PERMANENT
return ErrorCategory.UNKNOWN
async def call_with_retry(
self,
tool_name: str,
arguments: dict[str, Any],
server_name: str = "default",
) -> Any:
breaker = self._get_breaker(server_name)
if not breaker.allow_request():
fallback = self._fallback_handlers.get(tool_name)
if fallback:
logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
return await fallback(arguments)
raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")
last_error = None
for attempt in range(1, self._retry_config.max_attempts + 1):
try:
result = await self._client.call_tool(tool_name, arguments)
breaker.record_success()
return result
except Exception as e:
last_error = e
category = self.classify_error(e)
if category == ErrorCategory.PERMANENT:
breaker.record_failure()
raise
if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
delay = min(
self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
self._retry_config.max_delay,
)
if self._retry_config.jitter:
import random
delay *= (0.5 + random.random() * 0.5)
logger.warning(
f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
else:
breaker.record_failure()
if fallback := self._fallback_handlers.get(tool_name):
logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
return await fallback(arguments)
raise last_error
class CircuitOpenError(Exception):
pass
使用範例
async def fallback_search(arguments: dict) -> str:
return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
resilient = ResilientMcpClient(
discovery,
retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
)
resilient.register_fallback("search_docs", fallback_search)
try:
result = await resilient.call_with_retry("search_docs", {"query": "MCP協議解析"})
print(f"Result: {result}")
except CircuitOpenError as e:
print(f"Service unavailable: {e}")
except Exception as e:
print(f"Failed: {e}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 7: 串流回應處理
問題:長時間執行的工具呼叫需要即時進度回饋,MCP協議支援透過SSE和Notification實現串流回應。
解決方案:實現串流回應處理器,支援進度通知、部分結果聚合和取消操作。
串流回應架構
┌──────────┐ tools/call ┌──────────┐
│ Client │ ────────────> │ Server │
│ │ │ │
│ Stream │ <── notify ── │ Tool │
│ Handler │ progress │ Running │
│ │ <── notify ── │ │
│ │ progress │ │
│ │ <── result ── │ │
│ │ complete │ │
└──────────┘ └──────────┘
完整實現
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
logger = logging.getLogger(__name__)
class StreamEventType(Enum):
PROGRESS = "progress"
PARTIAL_RESULT = "partial_result"
COMPLETE = "complete"
ERROR = "error"
LOG = "log"
@dataclass
class StreamEvent:
type: StreamEventType
data: Any
progress: float | None = None
message: str | None = None
timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())
@dataclass
class StreamResult:
events: list[StreamEvent] = field(default_factory=list)
final_result: Any = None
success: bool = True
error: str | None = None
total_duration_ms: float = 0.0
class StreamHandler:
def __init__(self):
self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
self._error_handlers: list[Callable[[StreamEvent], Any]] = []
self._log_handlers: list[Callable[[StreamEvent], Any]] = []
def on_progress(self, handler: Callable[[StreamEvent], Any]):
self._progress_handlers.append(handler)
def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
self._partial_handlers.append(handler)
def on_complete(self, handler: Callable[[StreamEvent], Any]):
self._complete_handlers.append(handler)
def on_error(self, handler: Callable[[StreamEvent], Any]):
self._error_handlers.append(handler)
def on_log(self, handler: Callable[[StreamEvent], Any]):
self._log_handlers.append(handler)
async def handle_notification(self, params: dict[str, Any]):
method = params.get("method", "")
if method == "notifications/progress":
progress_data = params.get("progress", {})
event = StreamEvent(
type=StreamEventType.PROGRESS,
data=progress_data,
progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
message=progress_data.get("message"),
)
for handler in self._progress_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Progress handler error: {e}")
elif method == "notifications/message":
level = params.get("level", "info")
event = StreamEvent(
type=StreamEventType.LOG,
data=params.get("data"),
message=f"[{level}] {params.get('data', {}).get('text', '')}",
)
for handler in self._log_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Log handler error: {e}")
class StreamingMcpClient:
def __init__(self, client, discovery: ToolDiscovery):
self._client = client
self._discovery = discovery
self._stream_handler = StreamHandler()
self._active_calls: dict[str, asyncio.Task] = {}
client.on_notification(
"notifications/progress",
self._stream_handler.handle_notification,
)
client.on_notification(
"notifications/message",
self._stream_handler.handle_notification,
)
@property
def stream(self) -> StreamHandler:
return self._stream_handler
async def call_with_stream(
self,
tool_name: str,
arguments: dict[str, Any],
timeout: float = 120.0,
) -> StreamResult:
import time
start = time.monotonic()
result = StreamResult()
self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))
try:
call_result = await asyncio.wait_for(
self._discovery.call_tool(tool_name, arguments),
timeout=timeout,
)
result.final_result = call_result
result.success = True
result.events.append(StreamEvent(
type=StreamEventType.COMPLETE,
data=call_result,
))
except asyncio.TimeoutError:
result.success = False
result.error = f"Tool call timed out after {timeout}s"
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=result.error,
))
except Exception as e:
result.success = False
result.error = str(e)
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=str(e),
))
result.total_duration_ms = (time.monotonic() - start) * 1000
return result
async def cancel(self, request_id: int | str):
await self._client.send_notification("notifications/cancelled", {
"requestId": request_id,
"reason": "User cancelled",
})
使用範例
async def main():
client = JsonRpcClient(timeout=120.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
streaming = StreamingMcpClient(client, discovery)
streaming.stream.on_progress(lambda e: print(f" Progress: {e.progress:.0%} - {e.message or ''}"))
streaming.stream.on_log(lambda e: print(f" Log: {e.message}"))
streaming.stream.on_complete(lambda e: print(f" Complete!"))
streaming.stream.on_error(lambda e: print(f" Error: {e.message}"))
print("Calling long-running tool with streaming...")
result = await streaming.call_with_stream(
"batch_process",
{"items": list(range(100)), "operation": "analyze"},
timeout=120.0,
)
print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
if result.final_result:
print(f"Output: {result.final_result[:200]}...")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
5個常見坑及解決方案
1. 初始化握手遺漏initialized通知
坑:發送initialize請求後忘記發送notifications/initialized通知,導致Server一直等待握手完成,後續請求全部超時。
解決方案:
async def safe_initialize(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "my-client", "version": "1.0.0"},
})
# 必須發送initialized通知
await client.send_notification("notifications/initialized")
return result
2. SSE連線endpoint未等待就發送請求
坑:SSE連線建立後立即發送JSON-RPC請求,但還沒收到endpoint事件,導致請求發到了錯誤的URL。
解決方案:
class SafeSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._endpoint_ready = asyncio.Event()
self._transport.on_event("endpoint", self._on_endpoint)
async def _on_endpoint(self, data):
self._endpoint_ready.set()
async def send_request(self, method: str, params: dict | None = None) -> Any:
await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
# 現在可以安全發送請求
...
3. 工具schema轉換遺失required欄位
坑:將MCP工具schema轉換為OpenAI function calling格式時,inputSchema直接作為parameters使用,但某些LLM要求required欄位必須存在。
解決方案:
def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
schema = dict(mcp_tool.input_schema)
if "required" not in schema:
schema["required"] = []
if "type" not in schema:
schema["type"] = "object"
return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description,
"parameters": schema,
},
}
4. 並行呼叫時共享狀態競態
坑:多個並行工具呼叫共享同一個httpx.AsyncClient或asyncio.Future字典,導致回應錯配。
解決方案:每個並行呼叫使用獨立的請求ID空間,或使用連線池隔離:
class IsolatedClientPool:
def __init__(self, base_url: str, pool_size: int = 5):
self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
self._index = 0
self._lock = asyncio.Lock()
async def get_client(self) -> McpSseClient:
async with self._lock:
client = self._clients[self._index % len(self._clients)]
self._index += 1
return client
5. 串流回應中進度通知與結果混淆
坑:SSE通道中進度通知和最終結果使用同一個串流,客戶端把進度通知當作最終結果處理。
解決方案:嚴格區分訊息類型,只處理帶id欄位的回應作為請求結果:
async def _handle_sse_message(self, message: dict):
if "id" in message:
# 這是請求回應,匹配pending future
future = self._pending.pop(message["id"], None)
if future and not future.done():
future.set_result(message.get("result"))
elif "method" in message:
# 這是通知,交給stream handler
await self._stream_handler.handle_notification(message)
10個常見報錯排查
| 報錯資訊 | 原因 | 解決方法 |
|---|---|---|
MCP Error [-32600]: Invalid Request |
JSON-RPC訊息格式錯誤,缺少必填欄位 | 檢查請求是否包含jsonrpc、method、id欄位,使用 /zh-TW/json/format 驗證格式 |
MCP Error [-32601]: Method not found |
呼叫了Server不支援的方法 | 先呼叫initialize確認Server支援的能力,檢查方法名拼寫 |
MCP Error [-32602]: Invalid params |
工具參數類型或結構不匹配 | 用tools/list取得正確的inputSchema,對照檢查參數 |
Connection refused |
MCP Server未啟動或連接埠錯誤 | 確認Server處理序執行中,檢查連接埠和URL設定 |
SSE connection timeout |
網路不通或Server未回應SSE握手 | 檢查防火牆規則,確認SSE endpoint可達 |
Request timed out after 30s |
工具執行時間超過客戶端超時設定 | 增大timeout參數,或使用串流回應取得進度 |
Circuit breaker open |
連續失敗次數超過熔斷閾值 | 檢查Server健康狀態,等待恢復超時後重試 |
Tool not found: xxx |
呼叫了未註冊的工具名 | 重新執行tools/list發現,檢查工具名大小寫 |
Endpoint not received |
SSE連線後未收到endpoint事件 | 確認Server正確發送endpoint事件,檢查SSE路由設定 |
JSON decode error in SSE stream |
SSE資料不是有效JSON | 檢查Server端SSE事件格式,確保data欄位是合法JSON |
進階優化技巧
1. 連線池與多Server管理
生產環境中,AI Agent通常需要同時連接多個MCP Server。使用連線池管理多個客戶端實例:
class McpConnectionPool:
def __init__(self):
self._connections: dict[str, Any] = {}
self._discovery_map: dict[str, ToolDiscovery] = {}
async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio(command, args)
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
async def connect_sse(self, name: str, url: str):
client = McpSseClient(url)
await client.connect()
await client.initialize()
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
def get_discovery(self, name: str) -> ToolDiscovery | None:
return self._discovery_map.get(name)
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
for name, discovery in self._discovery_map.items():
if discovery.registry.get(tool_name):
return await discovery.call_tool(tool_name, arguments)
raise ValueError(f"Tool '{tool_name}' not found in any connected server")
async def close_all(self):
for client in self._connections.values():
await client.close()
2. 工具呼叫快取
對冪等工具呼叫結果進行快取,減少重複請求:
import hashlib
import json
from typing import Any
class ToolCallCache:
def __init__(self, ttl: float = 300.0, max_size: int = 1000):
self._cache: dict[str, tuple[float, Any]] = {}
self._ttl = ttl
self._max_size = max_size
def _make_key(self, tool_name: str, arguments: dict) -> str:
raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()
def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
key = self._make_key(tool_name, arguments)
if key in self._cache:
ts, value = self._cache[key]
if asyncio.get_event_loop().time() - ts < self._ttl:
return True, value
del self._cache[key]
return False, None
def set(self, tool_name: str, arguments: dict, value: Any):
if len(self._cache) >= self._max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
del self._cache[oldest_key]
key = self._make_key(tool_name, arguments)
self._cache[key] = (asyncio.get_event_loop().time(), value)
def invalidate(self, tool_name: str, arguments: dict | None = None):
if arguments:
key = self._make_key(tool_name, arguments)
self._cache.pop(key, None)
else:
prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for k in keys_to_remove:
del self._cache[k]
3. 可觀測性與追蹤
為MCP客戶端呼叫新增OpenTelemetry追蹤:
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")
tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")
class ObservableMcpClient:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
span.set_attribute("mcp.tool.name", tool_name)
span.set_attribute("mcp.tool.args_count", len(arguments))
import time
start = time.monotonic()
try:
result = await self._discovery.call_tool(tool_name, arguments)
duration = time.monotonic() - start
span.set_status(Status(StatusCode.OK))
tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
tool_call_duration.record(duration, {"tool": tool_name})
return result
except Exception as e:
duration = time.monotonic() - start
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
tool_call_duration.record(duration, {"tool": tool_name})
raise
對比分析:3種編排方案
| 維度 | 串行編排 | 並行編排 | DAG編排 |
|---|---|---|---|
| 執行方式 | 順序執行,前一步完成後執行下一步 | 所有任務同時執行 | 按依賴關係拓撲排序執行 |
| 適用場景 | 工具間有嚴格資料依賴 | 工具間完全獨立 | 工具間部分有依賴、部分獨立 |
| 延遲 | 所有工具耗時之和 | 最慢工具的耗時 | 關鍵路徑上的耗時之和 |
| 錯誤傳播 | 前一步失敗則後續全部跳過 | 單個失敗不影響其他 | 失敗節點的下游被跳過 |
| 實現複雜度 | 低 | 中 | 高 |
| 資源消耗 | 低(同一時間只用一個連線) | 高(需要並發連線池) | 中(按層級並發) |
| 結果一致性 | 強(順序確定) | 弱(完成順序不確定) | 中(同層級順序不確定) |
| 超時控制 | 每步獨立超時 | 全域超時+單任務超時 | 每步獨立+全域超時 |
| 典型場景 | 搜尋→擷取→翻譯 | 同時搜尋3個資料來源 | 搜尋A+B→合併→翻譯 |
選擇建議:
- 工具間有嚴格因果依賴 → 串行編排
- 工具間完全獨立 → 並行編排
- 工具間有部分依賴 → DAG編排
更多AI Agent工作流編排內容,推薦閱讀 AI Agent工作流DAG編排 和 AI Agent協議對比。
線上工具推薦
- JSON格式化:除錯MCP協議訊息時,使用 /zh-TW/json/format 格式化JSON-RPC請求和回應
- cURL轉程式碼:測試MCP Server介面時,使用 /zh-TW/dev/curl-to-code 快速產生Python客戶端程式碼
- Base64編解碼:處理MCP傳輸中的二進位資料時,使用 /zh-TW/encode/base64 編解碼
總結:Python MCP客戶端整合遠不止發送JSON-RPC請求那麼簡單。從基礎的stdio連線到SSE遠端傳輸,從工具發現與動態註冊到多工具串行/並行/DAG編排,從錯誤處理與熔斷降級到串流回應與進度追蹤——每一個環節都需要生產級的工程實踐。掌握這7種模式,你的AI Agent將具備可靠、高效、可觀測的工具呼叫能力。記住:MCP協議解析是基礎,多工具編排是核心,錯誤處理是保障,串流回應是體驗。參考MCP官方規範 Model Context Protocol Specification 取得最新協議定義。
本站提供瀏覽器本地工具,免註冊即可試用 →