Python MCPクライアント統合実践:プロトコル解析からマルチツールオーケストレーションまでの7つの本番パターン
なぜAIアプリケーションは外部ツールに接続できないのか?MCPクライアント統合の5つのペインポイント
3日かけて完璧なMCP Serverを構築したのに、Claude Desktopが接続できない?AI Agentが3つのツールを呼び出し、最初の1つがタイムアウトすると残りも全て失敗する?ツールディスカバリが大量のスキーマを返すが、LLMで使えるfunction calling形式に変換する方法がわからない?SSE接続が頻繁に切断・再接続され、データロスに気づかない?マルチツールオーケストレーションでシーケンシャルは遅すぎ、パラレルはカオスになる?
これらの問題の根本原因は:ほとんどの開発者がMCP Serverの開発にのみ注目し、Client側の統合の複雑さを無視していることです。MCPプロトコルは標準通信方式を定義していますが、クライアントはJSON-RPCメッセージ構築、トランスポート適応、ツールディスカバリと変換、オーケストレーション戦略、エラーリカバリなど、一連のエンジニアリング課題を処理する必要があります。
主要ポイント:
- MCPプロトコルJSON-RPC 2.0通信メカニズムをマスターし、信頼性の高いクライアント・サーバー接続を実現
- ツールディスカバリと動的登録を学び、AI Agentが新しいツールに自動適応できるようにする
- マルチツールオーケストレーションの7つの本番パターンを理解し、シーケンシャル呼び出しからパラレルDAG実行まで
- ネットワークジッタやサービス利用不可に対処する堅牢なエラー処理とリトライメカニズムを構築
- SSEストリーミングレスポンス処理をマスターし、リアルタイムのツール呼び出し進捗フィードバックを実現
目次ナビゲーション
- MCPプロトコルコア概念
- Pattern 1: 基本JSON-RPCクライアント
- Pattern 2: SSEトランスポート実装
- Pattern 3: ツールディスカバリと動的登録
- Pattern 4: マルチツールシーケンシャルオーケストレーション
- Pattern 5: マルチツールパラレルオーケストレーション
- Pattern 6: エラー処理とリトライ
- Pattern 7: ストリーミングレスポンス処理
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析:3つのオーケストレーションアプローチ
MCPプロトコルコア概念
7つの本番パターンに深く入る前に、MCPクライアントがプロトコルスタックのどこに位置し、どのようなコア概念があるかを理解する必要があります。
MCPクライアントアーキテクチャ概要
┌─────────────────────────────────────────────────┐
│ AI Application │
│ (Claude Desktop / ChatGPT / カスタムAgent) │
├─────────────────────────────────────────────────┤
│ MCP Client Layer │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ JSON-RPC │ │Transport │ │ Tool Registry │ │
│ │ Handler │ │ Adapter │ │ & Discovery │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ Orchest- │ │ Error & │ │ Stream │ │
│ │ ration │ │ Retry │ │ Handler │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
├─────────────────────────────────────────────────┤
│ Transport Layer │
│ stdio / SSE / Streamable HTTP │
├─────────────────────────────────────────────────┤
│ MCP Server │
│ Tools / Resources / Prompts │
└─────────────────────────────────────────────────┘
コア概念リファレンス
| 概念 | 説明 | JSON-RPCメソッド | 例え |
|---|---|---|---|
| Initialize | クライアント・サーバーハンドシェイク | initialize |
TCP 3ウェイハンドシェイク |
| Tool Discovery | サーバーがサポートするツールリストの取得 | tools/list |
DNSルックアップ |
| Tool Call | 特定のツールを呼び出して結果を取得 | tools/call |
HTTP POST |
| Resource Read | サーバーが公開するリソースの読み取り | resources/read |
HTTP GET |
| Prompt Get | 定義済みプロンプトテンプレートの取得 | prompts/get |
テンプレートエンジン |
| Notification | 一方向通知、応答不要 | idフィールドなし | UDPブロードキャスト |
| Cancel | 進行中のリクエストのキャンセル | notifications/cancelled |
HTTPキャンセル |
JSON-RPC 2.0メッセージフォーマット
MCPはJSON-RPC 2.0に基づいています。メッセージフォーマットの理解はクライアント開発の基礎です:
// リクエストメッセージ
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "search_docs",
"arguments": {"query": "MCPプロトコル"}
},
"id": 1
}
// 成功レスポンス
{
"jsonrpc": "2.0",
"result": {
"content": [
{"type": "text", "text": "検索結果..."}
]
},
"id": 1
}
// エラーレスポンス
{
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request"
},
"id": 1
}
MCP Server開発に興味がある場合は、以前の記事 Python MCP Server開発ガイド をご覧ください。AI関数呼び出しプロトコルのより深い理解については、AI Function Calling本番実践 をお勧めします。
Pattern 1: 基本JSON-RPCクライアント
問題:AIアプリケーションがMCP Serverに接続する必要があるが、正しいJSON-RPCメッセージの構築、リクエストIDの管理、レスポンスの処理方法がわからない。
解決策:メッセージ構築、送信、レスポンスマッチング、タイムアウト処理をカプセル化する完全なJSON-RPC 2.0クライアントを構築します。
完全な実装
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass
class JsonRpcRequest:
jsonrpc: str = "2.0"
method: str = ""
params: dict[str, Any] = field(default_factory=dict)
id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)
@dataclass
class JsonRpcResponse:
id: int | str
result: Any = None
error: dict[str, Any] | None = None
class JsonRpcClient:
def __init__(self, timeout: float = 30.0):
self._request_id = 0
self._pending: dict[int | str, asyncio.Future] = {}
self._timeout = timeout
self._notification_handlers: dict[str, Callable] = {}
self._writer: asyncio.StreamWriter | None = None
self._reader: asyncio.StreamReader | None = None
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
import os
proc_env = {**os.environ, **(env or {})}
self._process = await asyncio.create_subprocess_exec(
command,
*(args or []),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=proc_env,
)
self._reader = self._process.stdout
self._writer = self._process.stdin
asyncio.create_task(self._read_loop())
async def _read_loop(self):
while self._reader and not self._reader.at_eof():
line = await self._reader.readline()
if not line:
continue
try:
message = json.loads(line.decode().strip())
await self._handle_message(message)
except json.JSONDecodeError:
continue
async def _handle_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(
McpError(message["error"]["code"], message["error"]["message"])
)
else:
future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
elif "method" in message:
handler = self._notification_handlers.get(message["method"])
if handler:
await handler(message.get("params", {}))
async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
request_id = self._next_id()
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[request_id] = future
data = json.dumps(request) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
try:
response = await asyncio.wait_for(future, timeout=self._timeout)
return response.result
except asyncio.TimeoutError:
self._pending.pop(request_id, None)
raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")
async def send_notification(self, method: str, params: dict[str, Any] | None = None):
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
data = json.dumps(notification) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
def on_notification(self, method: str, handler: Callable):
self._notification_handlers[method] = handler
async def close(self):
if self._writer:
self._writer.close()
if hasattr(self, "_process") and self._process:
self._process.terminate()
await self._process.wait()
class McpError(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"MCP Error [{code}]: {message}")
class McpTimeoutError(Exception):
pass
初期化ハンドシェイク
async def initialize_client(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {},
},
"clientInfo": {
"name": "toolsku-mcp-client",
"version": "1.0.0",
},
})
await client.send_notification("notifications/initialized")
return result
使用例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
server_info = await initialize_client(client)
print(f"Connected to: {server_info['serverInfo']['name']}")
tools = await client.send_request("tools/list")
for tool in tools.get("tools", []):
print(f" - {tool['name']}: {tool['description']}")
result = await client.send_request("tools/call", {
"name": "search_docs",
"arguments": {"query": "MCPプロトコル解析"},
})
print(f"Result: {result}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 2: SSEトランスポート実装
問題:stdioトランスポートはローカルプロセス通信にのみ適しています。本番環境ではネットワーク経由でリモートMCP Serverに接続する必要があります。SSE(Server-Sent Events)はMCPプロトコルがサポートするリモートトランスポート方式です。
解決策:接続管理、イベント解析、メッセージルーティング、自動再接続を含む完全なSSEトランスポート層を実装します。
SSEトランスポートアーキテクチャ
┌──────────────┐ POST /messages ┌──────────────┐
│ MCP Client │ ──────────────────────> │ MCP Server │
│ │ │ │
│ SSE Handler │ <──── GET /sse ──────── │ SSE Endpoint│
│ │ EventStream │ │
└──────────────┘ └──────────────┘
│ │
│ 1. GET /sse でSSE接続を確立 │
│ 2. endpointイベントを受信しPOST URLを取得 │
│ 3. POST /messages でJSON-RPCリクエスト送信│
│ 4. SSE経由でJSON-RPCレスポンスを受信 │
└──────────────────────────────────────────┘
完全な実装
import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx
logger = logging.getLogger(__name__)
@dataclass
class SseEvent:
event: str = "message"
data: str = ""
id: str | None = None
retry: int | None = None
class SseTransport:
def __init__(
self,
server_url: str,
reconnect_interval: float = 5.0,
max_reconnect_attempts: int = 10,
headers: dict[str, str] | None = None,
):
self._server_url = server_url.rstrip("/")
self._reconnect_interval = reconnect_interval
self._max_reconnect_attempts = max_reconnect_attempts
self._headers = headers or {}
self._message_endpoint: str | None = None
self._session: httpx.AsyncClient | None = None
self._running = False
self._reconnect_count = 0
self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
self._last_event_id: str | None = None
async def connect(self):
self._session = httpx.AsyncClient(
headers={**self._headers, "Accept": "text/event-stream"},
timeout=httpx.Timeout(30.0, read=300.0),
)
self._running = True
asyncio.create_task(self._sse_listener())
async def _sse_listener(self):
while self._running:
try:
sse_url = f"{self._server_url}/sse"
params = {}
if self._last_event_id:
params["Last-Event-ID"] = self._last_event_id
async with self._session.stream("GET", sse_url, params=params) as response:
response.raise_for_status()
self._reconnect_count = 0
current_event = SseEvent()
async for line in response.aiter_lines():
if not self._running:
break
if line.startswith(":"):
continue
if line == "":
if current_event.data:
await self._dispatch_event(current_event)
current_event = SseEvent()
continue
if ":" in line:
field_name, _, field_value = line.partition(":")
field_value = field_value.lstrip(" ")
if field_name == "event":
current_event.event = field_value
elif field_name == "data":
if current_event.data:
current_event.data += "\n"
current_event.data += field_value
elif field_name == "id":
current_event.id = field_value
self._last_event_id = field_value
elif field_name == "retry":
try:
current_event.retry = int(field_value)
except ValueError:
pass
except httpx.HTTPStatusError as e:
logger.error(f"SSE HTTP error: {e.response.status_code}")
except httpx.RequestError as e:
logger.error(f"SSE connection error: {e}")
except Exception as e:
logger.error(f"SSE unexpected error: {e}")
if self._running:
self._reconnect_count += 1
if self._reconnect_count > self._max_reconnect_attempts:
logger.error("Max reconnect attempts reached, stopping")
self._running = False
break
wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
await asyncio.sleep(wait)
async def _dispatch_event(self, event: SseEvent):
if event.event == "endpoint":
self._message_endpoint = f"{self._server_url}{event.data}"
logger.info(f"Message endpoint: {self._message_endpoint}")
handlers = self._event_handlers.get(event.event, [])
for handler in handlers:
try:
if event.event == "endpoint":
await handler(event.data)
else:
message = json.loads(event.data)
await handler(message)
except Exception as e:
logger.error(f"Event handler error: {e}")
async def send_message(self, message: dict[str, Any]) -> None:
if not self._message_endpoint:
raise RuntimeError("SSE transport not initialized, endpoint not received")
if not self._session:
raise RuntimeError("SSE session not connected")
response = await self._session.post(
self._message_endpoint,
json=message,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
def on_event(self, event_type: str, handler: Callable):
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
async def close(self):
self._running = False
if self._session:
await self._session.aclose()
統合使用
class McpSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._request_id = 0
self._pending: dict[int, asyncio.Future] = {}
self._transport.on_event("message", self._on_message)
async def connect(self):
await self._transport.connect()
async def _on_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
else:
future.set_result(message.get("result"))
async def send_request(self, method: str, params: dict | None = None) -> Any:
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self._request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[self._request_id] = future
await self._transport.send_message(request)
try:
return await asyncio.wait_for(future, timeout=60.0)
except asyncio.TimeoutError:
self._pending.pop(self._request_id, None)
raise McpTimeoutError(f"Request {method} timed out")
async def initialize(self) -> dict:
result = await self.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
})
await self._transport.send_message({
"jsonrpc": "2.0",
"method": "notifications/initialized",
})
return result
async def close(self):
await self._transport.close()
async def main():
client = McpSseClient("http://localhost:8080")
await client.connect()
info = await client.initialize()
print(f"Connected: {info}")
tools = await client.send_request("tools/list")
print(f"Available tools: {tools}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 3: ツールディスカバリと動的登録
問題:MCP Serverのツールリストは動的です。クライアントはツールを自動発見し、LLM互換のfunction calling形式に変換する必要があり、ツール変更通知もサポートする必要があります。
解決策:ツールディスカバリ、スキーマ変換、動的登録、変更リスニングを実装します。
ツールディスカバリアーキテクチャ
┌──────────────┐ tools/list ┌──────────────┐
│ MCP Client │ ────────────> │ MCP Server │
│ │ <──────────── │ │
│ Tool │ Tool Schema │ Tool │
│ Registry │ │ Definitions │
│ │ tools/ │ │
│ ┌────────┐ │ list_changed │ │
│ │ OpenAI │ │ <──────────── │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Claude │ │ │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Gemini │ │ │ │
│ │ Format │ │ │ │
│ └────────┘ │ │ │
└──────────────┘ └──────────────┘
完全な実装
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class McpTool:
name: str
description: str
input_schema: dict[str, Any]
annotations: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mcp(cls, data: dict) -> "McpTool":
return cls(
name=data["name"],
description=data.get("description", ""),
input_schema=data.get("inputSchema", {}),
annotations=data.get("annotations", {}),
)
class ToolRegistry:
def __init__(self):
self._tools: dict[str, McpTool] = {}
self._change_handlers: list[Callable] = []
def register(self, tool: McpTool):
self._tools[tool.name] = tool
logger.info(f"Registered tool: {tool.name}")
self._notify_change("registered", tool)
def unregister(self, name: str):
if name in self._tools:
tool = self._tools.pop(name)
logger.info(f"Unregistered tool: {name}")
self._notify_change("unregistered", tool)
def get(self, name: str) -> McpTool | None:
return self._tools.get(name)
def list_tools(self) -> list[McpTool]:
return list(self._tools.values())
def on_change(self, handler: Callable):
self._change_handlers.append(handler)
def _notify_change(self, action: str, tool: McpTool):
for handler in self._change_handlers:
try:
handler(action, tool)
except Exception as e:
logger.error(f"Change handler error: {e}")
def to_openai_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
},
})
return result
def to_claude_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"name": tool.name,
"description": tool.description,
"input_schema": tool.input_schema,
})
return result
def to_gemini_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
params = tool.input_schema.get("properties", {})
required = tool.input_schema.get("required", [])
declarations = []
for param_name, param_schema in params.items():
declarations.append({
"name": param_name,
"description": param_schema.get("description", ""),
"type": self._json_type_to_gemini(param_schema.get("type", "string")),
"required": param_name in required,
})
result.append({
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "OBJECT",
"properties": {d["name"]: {"type": d["type"]} for d in declarations},
"required": [d["name"] for d in declarations if d["required"]],
},
})
return result
@staticmethod
def _json_type_to_gemini(json_type: str) -> str:
mapping = {
"string": "STRING",
"integer": "INTEGER",
"number": "NUMBER",
"boolean": "BOOLEAN",
"array": "ARRAY",
"object": "OBJECT",
}
return mapping.get(json_type, "STRING")
class ToolDiscovery:
def __init__(self, client):
self._client = client
self._registry = ToolRegistry()
self._client.on_notification(
"notifications/tools/list_changed",
self._on_tools_changed,
)
@property
def registry(self) -> ToolRegistry:
return self._registry
async def discover(self) -> list[McpTool]:
result = await self._client.send_request("tools/list")
tools = []
for tool_data in result.get("tools", []):
tool = McpTool.from_mcp(tool_data)
self._registry.register(tool)
tools.append(tool)
logger.info(f"Discovered {len(tools)} tools")
return tools
async def _on_tools_changed(self, params: dict):
logger.info("Tools list changed, re-discovering...")
self._registry._tools.clear()
await self.discover()
async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
tool = self._registry.get(name)
if not tool:
raise ValueError(f"Tool not found: {name}")
result = await self._client.send_request("tools/call", {
"name": name,
"arguments": arguments,
})
contents = result.get("content", [])
text_parts = []
for content in contents:
if content.get("type") == "text":
text_parts.append(content["text"])
elif content.get("type") == "image":
text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
elif content.get("type") == "resource":
text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")
return "\n".join(text_parts) if text_parts else str(result)
動的登録の使用
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
tools = await discovery.discover()
print(f"Discovered {len(tools)} tools:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
openai_tools = discovery.registry.to_openai_tools()
print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2, ensure_ascii=False)}")
claude_tools = discovery.registry.to_claude_tools()
print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2, ensure_ascii=False)}")
result = await discovery.call_tool("search_docs", {"query": "MCPプロトコル解析"})
print(f"\nTool result: {result}")
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Pattern 4: マルチツールシーケンシャルオーケストレーション
問題:AI Agentが複数のツールを順番に呼び出す必要があり、後のツールの入力が前のツールの出力に依存する場合、このようなシーケンシャル依存チェーンをどのようにエレガントに管理するか?
解決策:依存性注入、中間結果の受け渡し、条件分岐をサポートするシーケンシャルオーケストレーターを実装します。
シーケンシャルオーケストレーションフロー
ユーザー入力: "MCPドキュメントを検索して要約を翻訳"
│
▼
┌──────────┐ result ┌──────────┐ result ┌──────────┐
│ Tool 1 │ ────────────> │ Tool 2 │ ────────────> │ Tool 3 │
│ 検索 │ │ 要約抽出 │ │ 翻訳 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ $search_result │ $summary_text │ $translated
▼ ▼ ▼
Context Store Context Store Context Store
完全な実装
import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class StepResult:
step_name: str
tool_name: str
input_args: dict[str, Any]
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
@dataclass
class PipelineStep:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
condition: Callable[[dict[str, Any]], bool] | None = None
timeout: float = 30.0
retry_count: int = 0
def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
if callable(self.arguments):
return self.arguments(context)
resolved = {}
for key, value in self.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
def _deep_get(data: dict, path: str) -> Any:
keys = path.split(".")
current = data
for key in keys:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, list) and key.isdigit():
idx = int(key)
current = current[idx] if idx < len(current) else None
else:
return None
return current
class SequentialOrchestrator:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
self._steps: list[PipelineStep] = []
self._context: dict[str, Any] = {}
def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
self._steps.append(step)
return self
def set_context(self, key: str, value: Any):
self._context[key] = value
async def execute(self) -> list[StepResult]:
results = []
context = dict(self._context)
for step in self._steps:
if step.condition and not step.condition(context):
logger.info(f"Skipping step '{step.name}': condition not met")
continue
resolved_args = step.resolve_arguments(context)
logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= step.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(step.tool_name, resolved_args),
timeout=step.timeout,
)
duration = (time.monotonic() - start) * 1000
context[f"steps.{step.name}"] = output
context["last_result"] = output
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=output,
success=True,
duration_ms=duration,
))
break
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= step.retry_count:
logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
await asyncio.sleep(1.0 * attempt)
else:
duration = (time.monotonic() - start) * 1000
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=None,
success=False,
error=last_error,
duration_ms=duration,
))
break
return results
使用例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = SequentialOrchestrator(discovery)
orchestrator.set_context("user_query", "MCPプロトコル解析とPythonクライアント統合")
orchestrator.add_step(PipelineStep(
name="search",
tool_name="search_docs",
arguments={"query": "$user_query", "limit": 5},
))
orchestrator.add_step(PipelineStep(
name="extract",
tool_name="extract_summary",
arguments={"text": "$steps.search"},
condition=lambda ctx: bool(ctx.get("steps.search")),
))
orchestrator.add_step(PipelineStep(
name="translate",
tool_name="translate_text",
arguments={"text": "$steps.extract", "target_lang": "en"},
condition=lambda ctx: bool(ctx.get("steps.extract")),
))
results = await orchestrator.execute()
for r in results:
status = "OK" if r.success else "FAIL"
print(f"[{status}] [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
if r.error:
print(f" Error: {r.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 5: マルチツールパラレルオーケストレーション
問題:複数のツール呼び出し間に依存関係がない場合、シーケンシャル実行は時間を無駄にします。複数のMCPツールを安全に並列呼び出しし、結果を集約するには?
解決策:asyncioベースの並行制御、結果集約、DAG依存実行をサポートするパラレルオーケストレーターを実装します。
パラレルオーケストレーションアーキテクチャ
┌──────────┐
│ リクエスト│
└────┬─────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Tool A │ │ Tool B │ │ Tool C │
│ 検索 │ │ DB照会 │ │ API呼出 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────┬──┘────────────┘
│
▼
┌─────────────┐
│ 結果集約 │
└─────────────┘
完全な実装
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class ParallelTask:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
depends_on: list[str] = field(default_factory=list)
timeout: float = 30.0
retry_count: int = 0
@dataclass
class TaskResult:
name: str
tool_name: str
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
class ParallelOrchestrator:
def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
self._discovery = discovery
self._max_concurrency = max_concurrency
self._tasks: list[ParallelTask] = []
def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
self._tasks.append(task)
return self
async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
context = dict(initial_context or {})
results: dict[str, TaskResult] = {}
semaphore = asyncio.Semaphore(self._max_concurrency)
completed = set()
remaining = list(self._tasks)
while remaining:
ready = [
t for t in remaining
if all(dep in completed for dep in t.depends_on)
]
if not ready and remaining:
logger.error("Circular dependency detected")
break
coros = [self._execute_task(task, context, results, semaphore) for task in ready]
task_results = await asyncio.gather(*coros, return_exceptions=True)
for task, result in zip(ready, task_results):
if isinstance(result, Exception):
results[task.name] = TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=str(result),
)
else:
results[task.name] = result
context[f"tasks.{task.name}"] = result.output
completed.add(task.name)
remaining.remove(task)
return results
async def _execute_task(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
semaphore: asyncio.Semaphore,
) -> TaskResult:
async with semaphore:
resolved_args = self._resolve_arguments(task, context, results)
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= task.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(task.tool_name, resolved_args),
timeout=task.timeout,
)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=output,
success=True,
duration_ms=duration,
)
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= task.retry_count:
await asyncio.sleep(0.5 * attempt)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=last_error,
duration_ms=duration,
)
def _resolve_arguments(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
) -> dict[str, Any]:
if callable(task.arguments):
return task.arguments(context)
resolved = {}
for key, value in task.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
if ref_path.startswith("tasks."):
task_name = ref_path.split(".")[1]
if task_name in results:
resolved[key] = results[task_name].output
else:
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
class DagOrchestrator(ParallelOrchestrator):
def validate_dag(self) -> list[str]:
errors = []
task_names = {t.name for t in self._tasks}
for task in self._tasks:
for dep in task.depends_on:
if dep not in task_names:
errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")
visited = set()
path = set()
def has_cycle(name: str) -> bool:
visited.add(name)
path.add(name)
task = next((t for t in self._tasks if t.name == name), None)
if task:
for dep in task.depends_on:
if dep in path:
return True
if dep not in visited and has_cycle(dep):
return True
path.remove(name)
return False
for task in self._tasks:
if task.name not in visited:
if has_cycle(task.name):
errors.append("Circular dependency detected in DAG")
break
return errors
使用例
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)
orchestrator.add_task(ParallelTask(
name="search_web",
tool_name="web_search",
arguments={"query": "MCPプロトコル 2026"},
))
orchestrator.add_task(ParallelTask(
name="search_docs",
tool_name="search_docs",
arguments={"query": "Model Context Protocol Python"},
))
orchestrator.add_task(ParallelTask(
name="query_db",
tool_name="query_database",
arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
))
orchestrator.add_task(ParallelTask(
name="merge_results",
tool_name="merge_and_deduplicate",
arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
depends_on=["search_web", "search_docs", "query_db"],
))
results = await orchestrator.execute({"user_query": "MCPクライアント統合"})
for name, result in results.items():
status = "OK" if result.success else "FAIL"
print(f"[{status}] [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
if result.error:
print(f" Error: {result.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 6: エラー処理とリトライ
問題:MCPクライアントは、ネットワーク不安定、サーバー過負荷、ツール実行例外時に堅牢なエラー処理とリトライメカニズムが必要です。
解決策:指数バックオフリトライ、サーキットブレーカー、フォールバック戦略による階層型エラー処理を実装します。
エラー処理アーキテクチャ
┌────────────────────────────────────────────┐
│ Error Handling Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Retry │ │ Circuit │ │ Fallback│ │
│ │ Strategy │ │ Breaker │ │ Handler │ │
│ └──────────┘ └──────────┘ └─────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Error Classification │ │
│ │ Transient / Permanent / Unknown │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
完全な実装
import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
class ErrorCategory(Enum):
TRANSIENT = "transient"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: list[int] = field(default_factory=lambda: [
-32603, # Internal error
-32000, # Server error
])
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig | None = None):
self._config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float = 0
self._half_open_calls = 0
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def allow_request(self) -> bool:
state = self.state
if state == CircuitState.CLOSED:
return True
if state == CircuitState.HALF_OPEN:
if self._half_open_calls < self._config.half_open_max_calls:
self._half_open_calls += 1
return True
return False
return False
def record_success(self):
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self._config.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
else:
self._failure_count = 0
def record_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._success_count = 0
elif self._failure_count >= self._config.failure_threshold:
self._state = CircuitState.OPEN
class ResilientMcpClient:
def __init__(
self,
client,
retry_config: RetryConfig | None = None,
circuit_config: CircuitBreakerConfig | None = None,
):
self._client = client
self._retry_config = retry_config or RetryConfig()
self._circuit_breakers: dict[str, CircuitBreaker] = {}
self._circuit_config = circuit_config or CircuitBreakerConfig()
self._fallback_handlers: dict[str, Callable] = {}
def _get_breaker(self, server_name: str) -> CircuitBreaker:
if server_name not in self._circuit_breakers:
self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
return self._circuit_breakers[server_name]
def register_fallback(self, tool_name: str, handler: Callable):
self._fallback_handlers[tool_name] = handler
def classify_error(self, error: Exception) -> ErrorCategory:
if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
return ErrorCategory.TRANSIENT
if isinstance(error, McpError):
if error.code in self._retry_config.retryable_errors:
return ErrorCategory.TRANSIENT
if error.code in [-32600, -32601, -32602]:
return ErrorCategory.PERMANENT
return ErrorCategory.UNKNOWN
async def call_with_retry(
self,
tool_name: str,
arguments: dict[str, Any],
server_name: str = "default",
) -> Any:
breaker = self._get_breaker(server_name)
if not breaker.allow_request():
fallback = self._fallback_handlers.get(tool_name)
if fallback:
logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
return await fallback(arguments)
raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")
last_error = None
for attempt in range(1, self._retry_config.max_attempts + 1):
try:
result = await self._client.call_tool(tool_name, arguments)
breaker.record_success()
return result
except Exception as e:
last_error = e
category = self.classify_error(e)
if category == ErrorCategory.PERMANENT:
breaker.record_failure()
raise
if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
delay = min(
self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
self._retry_config.max_delay,
)
if self._retry_config.jitter:
import random
delay *= (0.5 + random.random() * 0.5)
logger.warning(
f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
else:
breaker.record_failure()
if fallback := self._fallback_handlers.get(tool_name):
logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
return await fallback(arguments)
raise last_error
class CircuitOpenError(Exception):
pass
使用例
async def fallback_search(arguments: dict) -> str:
return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
resilient = ResilientMcpClient(
discovery,
retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
)
resilient.register_fallback("search_docs", fallback_search)
try:
result = await resilient.call_with_retry("search_docs", {"query": "MCPプロトコル解析"})
print(f"Result: {result}")
except CircuitOpenError as e:
print(f"Service unavailable: {e}")
except Exception as e:
print(f"Failed: {e}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 7: ストリーミングレスポンス処理
問題:長時間実行されるツール呼び出しにはリアルタイムの進捗フィードバックが必要です。MCPプロトコルはSSEとNotificationによるストリーミングレスポンスをサポートしています。
解決策:進捗通知、部分結果集約、キャンセル操作をサポートするストリーミングレスポンスハンドラーを実装します。
ストリーミングレスポンスアーキテクチャ
┌──────────┐ tools/call ┌──────────┐
│ Client │ ────────────> │ Server │
│ │ │ │
│ Stream │ <── notify ── │ Tool │
│ Handler │ progress │ Running │
│ │ <── notify ── │ │
│ │ progress │ │
│ │ <── result ── │ │
│ │ complete │ │
└──────────┘ └──────────┘
完全な実装
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
logger = logging.getLogger(__name__)
class StreamEventType(Enum):
PROGRESS = "progress"
PARTIAL_RESULT = "partial_result"
COMPLETE = "complete"
ERROR = "error"
LOG = "log"
@dataclass
class StreamEvent:
type: StreamEventType
data: Any
progress: float | None = None
message: str | None = None
timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())
@dataclass
class StreamResult:
events: list[StreamEvent] = field(default_factory=list)
final_result: Any = None
success: bool = True
error: str | None = None
total_duration_ms: float = 0.0
class StreamHandler:
def __init__(self):
self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
self._error_handlers: list[Callable[[StreamEvent], Any]] = []
self._log_handlers: list[Callable[[StreamEvent], Any]] = []
def on_progress(self, handler: Callable[[StreamEvent], Any]):
self._progress_handlers.append(handler)
def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
self._partial_handlers.append(handler)
def on_complete(self, handler: Callable[[StreamEvent], Any]):
self._complete_handlers.append(handler)
def on_error(self, handler: Callable[[StreamEvent], Any]):
self._error_handlers.append(handler)
def on_log(self, handler: Callable[[StreamEvent], Any]):
self._log_handlers.append(handler)
async def handle_notification(self, params: dict[str, Any]):
method = params.get("method", "")
if method == "notifications/progress":
progress_data = params.get("progress", {})
event = StreamEvent(
type=StreamEventType.PROGRESS,
data=progress_data,
progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
message=progress_data.get("message"),
)
for handler in self._progress_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Progress handler error: {e}")
elif method == "notifications/message":
level = params.get("level", "info")
event = StreamEvent(
type=StreamEventType.LOG,
data=params.get("data"),
message=f"[{level}] {params.get('data', {}).get('text', '')}",
)
for handler in self._log_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Log handler error: {e}")
class StreamingMcpClient:
def __init__(self, client, discovery: ToolDiscovery):
self._client = client
self._discovery = discovery
self._stream_handler = StreamHandler()
self._active_calls: dict[str, asyncio.Task] = {}
client.on_notification(
"notifications/progress",
self._stream_handler.handle_notification,
)
client.on_notification(
"notifications/message",
self._stream_handler.handle_notification,
)
@property
def stream(self) -> StreamHandler:
return self._stream_handler
async def call_with_stream(
self,
tool_name: str,
arguments: dict[str, Any],
timeout: float = 120.0,
) -> StreamResult:
import time
start = time.monotonic()
result = StreamResult()
self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))
try:
call_result = await asyncio.wait_for(
self._discovery.call_tool(tool_name, arguments),
timeout=timeout,
)
result.final_result = call_result
result.success = True
result.events.append(StreamEvent(
type=StreamEventType.COMPLETE,
data=call_result,
))
except asyncio.TimeoutError:
result.success = False
result.error = f"Tool call timed out after {timeout}s"
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=result.error,
))
except Exception as e:
result.success = False
result.error = str(e)
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=str(e),
))
result.total_duration_ms = (time.monotonic() - start) * 1000
return result
async def cancel(self, request_id: int | str):
await self._client.send_notification("notifications/cancelled", {
"requestId": request_id,
"reason": "User cancelled",
})
使用例
async def main():
client = JsonRpcClient(timeout=120.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
streaming = StreamingMcpClient(client, discovery)
streaming.stream.on_progress(lambda e: print(f" Progress: {e.progress:.0%} - {e.message or ''}"))
streaming.stream.on_log(lambda e: print(f" Log: {e.message}"))
streaming.stream.on_complete(lambda e: print(f" Complete!"))
streaming.stream.on_error(lambda e: print(f" Error: {e.message}"))
print("Calling long-running tool with streaming...")
result = await streaming.call_with_stream(
"batch_process",
{"items": list(range(100)), "operation": "analyze"},
timeout=120.0,
)
print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
if result.final_result:
print(f"Output: {str(result.final_result)[:200]}...")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
5つのよくある落とし穴と解決策
1. 初期化ハンドシェイクでのinitialized通知の漏れ
落とし穴:initializeリクエスト送信後、notifications/initialized通知の送信を忘れる。Serverはハンドシェイク完了を待ち続け、後続のリクエストがすべてタイムアウトする。
解決策:
async def safe_initialize(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "my-client", "version": "1.0.0"},
})
# initialized通知を必ず送信
await client.send_notification("notifications/initialized")
return result
2. SSEエンドポイント準備前のリクエスト送信
落とし穴:SSE接続確立後、endpointイベントを受信する前にJSON-RPCリクエストを即座に送信し、間違ったURLにリクエストが送られる。
解決策:
class SafeSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._endpoint_ready = asyncio.Event()
self._transport.on_event("endpoint", self._on_endpoint)
async def _on_endpoint(self, data):
self._endpoint_ready.set()
async def send_request(self, method: str, params: dict | None = None) -> Any:
await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
# これで安全にリクエスト送信可能
...
3. ツールスキーマ変換でのrequiredフィールドの欠落
落とし穴:MCPツールスキーマをOpenAI function calling形式に変換する際、inputSchemaをそのままparametersとして使用するが、一部のLLMはrequiredフィールドの存在を要求する。
解決策:
def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
schema = dict(mcp_tool.input_schema)
if "required" not in schema:
schema["required"] = []
if "type" not in schema:
schema["type"] = "object"
return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description,
"parameters": schema,
},
}
4. パラレル呼び出し時の共有状態の競合
落とし穴:複数の並列ツール呼び出しが同じhttpx.AsyncClientやasyncio.Future辞書を共有し、レスポンスの誤一致が発生する。
解決策:各並列呼び出しで独立したリクエストID空間を使用するか、接続プールの分離を行う:
class IsolatedClientPool:
def __init__(self, base_url: str, pool_size: int = 5):
self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
self._index = 0
self._lock = asyncio.Lock()
async def get_client(self) -> McpSseClient:
async with self._lock:
client = self._clients[self._index % len(self._clients)]
self._index += 1
return client
5. ストリーミングレスポンスでの進捗通知と最終結果の混同
落とし穴:SSEチャネルで進捗通知と最終結果が同じストリームを共有し、クライアントが進捗通知を最終結果として処理してしまう。
解決策:メッセージタイプを厳密に区別し、idフィールドを持つレスポンスのみをリクエスト結果として処理する:
async def _handle_sse_message(self, message: dict):
if "id" in message:
# これはリクエストレスポンス、保留中のfutureと照合
future = self._pending.pop(message["id"], None)
if future and not future.done():
future.set_result(message.get("result"))
elif "method" in message:
# これは通知、ストリームハンドラーに転送
await self._stream_handler.handle_notification(message)
10のよくあるエラートラブルシューティング
| エラーメッセージ | 原因 | 解決策 |
|---|---|---|
MCP Error [-32600]: Invalid Request |
JSON-RPCメッセージ形式エラー、必須フィールドの欠落 | リクエストにjsonrpc、method、idフィールドが含まれているか確認。/ja/json/formatで形式を検証 |
MCP Error [-32601]: Method not found |
サーバーがサポートしていないメソッドの呼び出し | 先にinitializeでサーバーの機能を確認。メソッド名のスペルチェック |
MCP Error [-32602]: Invalid params |
ツールパラメータの型や構造の不一致 | tools/listで正しいinputSchemaを取得し、パラメータを照合 |
Connection refused |
MCP Serverが起動していない、またはポートが間違っている | Serverプロセスの実行を確認。ポートとURL設定をチェック |
SSE connection timeout |
ネットワークが到達不能、またはServerがSSEハンドシェイクに応答しない | ファイアウォールルールを確認。SSEエンドポイントの到達可能性を確認 |
Request timed out after 30s |
ツール実行がクライアントのタイムアウト設定を超過 | タイムアウトパラメータを増やすか、ストリーミングレスポンスを使用 |
Circuit breaker open |
連続失敗回数がサーキットブレーカーの閾値を超過 | Serverの健全性を確認。リカバリタイムアウト後にリトライ |
Tool not found: xxx |
未登録のツール名の呼び出し | tools/listを再実行してディスカバリ。ツール名の大文字小文字を確認 |
Endpoint not received |
SSE接続後にendpointイベントを受信していない | Serverが正しくendpointイベントを送信しているか確認。SSEルート設定をチェック |
JSON decode error in SSE stream |
SSEデータが有効なJSONではない | Server側のSSEイベント形式を確認。dataフィールドが有効なJSONであることを確認 |
高度な最適化テクニック
1. 接続プールとマルチServer管理
本番環境では、AI Agentは通常複数のMCP Serverに同時接続する必要があります。接続プールを使用して複数のクライアントインスタンスを管理します:
class McpConnectionPool:
def __init__(self):
self._connections: dict[str, Any] = {}
self._discovery_map: dict[str, ToolDiscovery] = {}
async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio(command, args)
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
async def connect_sse(self, name: str, url: str):
client = McpSseClient(url)
await client.connect()
await client.initialize()
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
def get_discovery(self, name: str) -> ToolDiscovery | None:
return self._discovery_map.get(name)
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
for name, discovery in self._discovery_map.items():
if discovery.registry.get(tool_name):
return await discovery.call_tool(tool_name, arguments)
raise ValueError(f"Tool '{tool_name}' not found in any connected server")
async def close_all(self):
for client in self._connections.values():
await client.close()
2. ツール呼び出しキャッシュ
冪等なツール呼び出し結果をキャッシュし、重複リクエストを削減します:
import hashlib
import json
from typing import Any
class ToolCallCache:
def __init__(self, ttl: float = 300.0, max_size: int = 1000):
self._cache: dict[str, tuple[float, Any]] = {}
self._ttl = ttl
self._max_size = max_size
def _make_key(self, tool_name: str, arguments: dict) -> str:
raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()
def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
key = self._make_key(tool_name, arguments)
if key in self._cache:
ts, value = self._cache[key]
if asyncio.get_event_loop().time() - ts < self._ttl:
return True, value
del self._cache[key]
return False, None
def set(self, tool_name: str, arguments: dict, value: Any):
if len(self._cache) >= self._max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
del self._cache[oldest_key]
key = self._make_key(tool_name, arguments)
self._cache[key] = (asyncio.get_event_loop().time(), value)
def invalidate(self, tool_name: str, arguments: dict | None = None):
if arguments:
key = self._make_key(tool_name, arguments)
self._cache.pop(key, None)
else:
prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for k in keys_to_remove:
del self._cache[k]
3. オブザーバビリティとトレーシング
MCPクライアント呼び出しにOpenTelemetryトレーシングを追加します:
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")
tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")
class ObservableMcpClient:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
span.set_attribute("mcp.tool.name", tool_name)
span.set_attribute("mcp.tool.args_count", len(arguments))
import time
start = time.monotonic()
try:
result = await self._discovery.call_tool(tool_name, arguments)
duration = time.monotonic() - start
span.set_status(Status(StatusCode.OK))
tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
tool_call_duration.record(duration, {"tool": tool_name})
return result
except Exception as e:
duration = time.monotonic() - start
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
tool_call_duration.record(duration, {"tool": tool_name})
raise
比較分析:3つのオーケストレーションアプローチ
| 次元 | シーケンシャル | パラレル | DAG |
|---|---|---|---|
| 実行方式 | 順次実行、前のステップ完了後に次を実行 | 全タスクを同時実行 | 依存関係に基づくトポロジカルソートで実行 |
| 適用シナリオ | ツール間に厳密なデータ依存がある場合 | ツール間が完全に独立している場合 | 一部依存、一部独立 |
| レイテンシ | 全ツール所要時間の合計 | 最も遅いツールの所要時間 | クリティカルパス上の所要時間の合計 |
| エラー伝播 | 前のステップの失敗で後続を全スキップ | 単一の失敗は他に影響しない | 失敗ノードの下流をスキップ |
| 実装複雑度 | 低 | 中 | 高 |
| リソース消費 | 低(同時に1つの接続のみ使用) | 高(並行接続プールが必要) | 中(レイヤーごとの並行) |
| 結果の一貫性 | 強(決定的な順序) | 弱(非決定的な完了順序) | 中(同じレイヤー内は非決定的) |
| タイムアウト制御 | ステップごとに独立したタイムアウト | グローバル+タスクごとのタイムアウト | ステップごと+グローバルタイムアウト |
| 典型的シナリオ | 検索→抽出→翻訳 | 3つのデータソースを同時検索 | 検索A+B→マージ→翻訳 |
選択ガイド:
- ツール間に厳密な因果依存がある → シーケンシャルオーケストレーション
- ツール間が完全に独立 → パラレルオーケストレーション
- ツール間に部分的な依存がある → DAGオーケストレーション
AI Agentワークフローオーケストレーションの詳細については、AI AgentワークフローDAGオーケストレーション と AI Agentプロトコル比較 をご覧ください。
オンラインツール推奨
- JSONフォーマッター:MCPプロトコルメッセージのデバッグ時に、/ja/json/format でJSON-RPCリクエストとレスポンスをフォーマット
- cURL to Code:MCP Serverエンドポイントのテスト時に、/ja/dev/curl-to-code でPythonクライアントコードを素早く生成
- Base64エンコード/デコード:MCPトランスポートのバイナリデータ処理時に、/ja/encode/base64 でエンコード/デコード
まとめ:Python MCPクライアント統合は、単にJSON-RPCリクエストを送信するだけではありません。基本的なstdio接続からSSEリモートトランスポートまで、ツールディスカバリと動的登録からマルチツールのシーケンシャル/パラレル/DAGオーケストレーションまで、サーキットブレーカーとフォールバックによるエラー処理からストリーミングレスポンスと進捗追跡まで—あらゆる側面で本番レベルのエンジニアリングが求められます。この7つのパターンをマスターすれば、AI Agentは信頼性が高く、効率的で、観測可能なツール呼び出し能力を持つことになります。覚えておいてください:MCPプロトコル解析は基盤、マルチツールオーケストレーションは核心、エラー処理は保障、ストリーミングレスポンスは体験です。最新のプロトコル定義については、公式の Model Context Protocol Specification を参照してください。
ブラウザローカルツールを無料で試す →