Python MCPクライアント統合実践:プロトコル解析からマルチツールオーケストレーションまでの7つの本番パターン

AI与大数据

なぜAIアプリケーションは外部ツールに接続できないのか?MCPクライアント統合の5つのペインポイント

3日かけて完璧なMCP Serverを構築したのに、Claude Desktopが接続できない?AI Agentが3つのツールを呼び出し、最初の1つがタイムアウトすると残りも全て失敗する?ツールディスカバリが大量のスキーマを返すが、LLMで使えるfunction calling形式に変換する方法がわからない?SSE接続が頻繁に切断・再接続され、データロスに気づかない?マルチツールオーケストレーションでシーケンシャルは遅すぎ、パラレルはカオスになる?

これらの問題の根本原因は:ほとんどの開発者がMCP Serverの開発にのみ注目し、Client側の統合の複雑さを無視していることです。MCPプロトコルは標準通信方式を定義していますが、クライアントはJSON-RPCメッセージ構築、トランスポート適応、ツールディスカバリと変換、オーケストレーション戦略、エラーリカバリなど、一連のエンジニアリング課題を処理する必要があります。

主要ポイント

  • MCPプロトコルJSON-RPC 2.0通信メカニズムをマスターし、信頼性の高いクライアント・サーバー接続を実現
  • ツールディスカバリと動的登録を学び、AI Agentが新しいツールに自動適応できるようにする
  • マルチツールオーケストレーションの7つの本番パターンを理解し、シーケンシャル呼び出しからパラレルDAG実行まで
  • ネットワークジッタやサービス利用不可に対処する堅牢なエラー処理とリトライメカニズムを構築
  • SSEストリーミングレスポンス処理をマスターし、リアルタイムのツール呼び出し進捗フィードバックを実現

目次ナビゲーション


MCPプロトコルコア概念

7つの本番パターンに深く入る前に、MCPクライアントがプロトコルスタックのどこに位置し、どのようなコア概念があるかを理解する必要があります。

MCPクライアントアーキテクチャ概要

┌─────────────────────────────────────────────────┐
│                  AI Application                  │
│  (Claude Desktop / ChatGPT / カスタムAgent)      │
├─────────────────────────────────────────────────┤
│               MCP Client Layer                   │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ JSON-RPC  │ │Transport │ │ Tool Registry  │  │
│  │ Handler   │ │ Adapter  │ │ & Discovery    │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ Orchest-  │ │ Error &  │ │ Stream         │  │
│  │ ration    │ │ Retry    │ │ Handler        │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
├─────────────────────────────────────────────────┤
│              Transport Layer                     │
│     stdio / SSE / Streamable HTTP               │
├─────────────────────────────────────────────────┤
│              MCP Server                          │
│  Tools / Resources / Prompts                    │
└─────────────────────────────────────────────────┘

コア概念リファレンス

概念 説明 JSON-RPCメソッド 例え
Initialize クライアント・サーバーハンドシェイク initialize TCP 3ウェイハンドシェイク
Tool Discovery サーバーがサポートするツールリストの取得 tools/list DNSルックアップ
Tool Call 特定のツールを呼び出して結果を取得 tools/call HTTP POST
Resource Read サーバーが公開するリソースの読み取り resources/read HTTP GET
Prompt Get 定義済みプロンプトテンプレートの取得 prompts/get テンプレートエンジン
Notification 一方向通知、応答不要 idフィールドなし UDPブロードキャスト
Cancel 進行中のリクエストのキャンセル notifications/cancelled HTTPキャンセル

JSON-RPC 2.0メッセージフォーマット

MCPはJSON-RPC 2.0に基づいています。メッセージフォーマットの理解はクライアント開発の基礎です:

// リクエストメッセージ
{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "search_docs",
    "arguments": {"query": "MCPプロトコル"}
  },
  "id": 1
}

// 成功レスポンス
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {"type": "text", "text": "検索結果..."}
    ]
  },
  "id": 1
}

// エラーレスポンス
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32600,
    "message": "Invalid Request"
  },
  "id": 1
}

MCP Server開発に興味がある場合は、以前の記事 Python MCP Server開発ガイド をご覧ください。AI関数呼び出しプロトコルのより深い理解については、AI Function Calling本番実践 をお勧めします。


Pattern 1: 基本JSON-RPCクライアント

問題:AIアプリケーションがMCP Serverに接続する必要があるが、正しいJSON-RPCメッセージの構築、リクエストIDの管理、レスポンスの処理方法がわからない。

解決策:メッセージ構築、送信、レスポンスマッチング、タイムアウト処理をカプセル化する完全なJSON-RPC 2.0クライアントを構築します。

完全な実装

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable


@dataclass
class JsonRpcRequest:
    jsonrpc: str = "2.0"
    method: str = ""
    params: dict[str, Any] = field(default_factory=dict)
    id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)


@dataclass
class JsonRpcResponse:
    id: int | str
    result: Any = None
    error: dict[str, Any] | None = None


class JsonRpcClient:
    def __init__(self, timeout: float = 30.0):
        self._request_id = 0
        self._pending: dict[int | str, asyncio.Future] = {}
        self._timeout = timeout
        self._notification_handlers: dict[str, Callable] = {}
        self._writer: asyncio.StreamWriter | None = None
        self._reader: asyncio.StreamReader | None = None

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
        import os
        proc_env = {**os.environ, **(env or {})}
        self._process = await asyncio.create_subprocess_exec(
            command,
            *(args or []),
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=proc_env,
        )
        self._reader = self._process.stdout
        self._writer = self._process.stdin
        asyncio.create_task(self._read_loop())

    async def _read_loop(self):
        while self._reader and not self._reader.at_eof():
            line = await self._reader.readline()
            if not line:
                continue
            try:
                message = json.loads(line.decode().strip())
                await self._handle_message(message)
            except json.JSONDecodeError:
                continue

    async def _handle_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(
                        McpError(message["error"]["code"], message["error"]["message"])
                    )
                else:
                    future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
        elif "method" in message:
            handler = self._notification_handlers.get(message["method"])
            if handler:
                await handler(message.get("params", {}))

    async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
        request_id = self._next_id()
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[request_id] = future

        data = json.dumps(request) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

        try:
            response = await asyncio.wait_for(future, timeout=self._timeout)
            return response.result
        except asyncio.TimeoutError:
            self._pending.pop(request_id, None)
            raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")

    async def send_notification(self, method: str, params: dict[str, Any] | None = None):
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        data = json.dumps(notification) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

    def on_notification(self, method: str, handler: Callable):
        self._notification_handlers[method] = handler

    async def close(self):
        if self._writer:
            self._writer.close()
        if hasattr(self, "_process") and self._process:
            self._process.terminate()
            await self._process.wait()


class McpError(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"MCP Error [{code}]: {message}")


class McpTimeoutError(Exception):
    pass

初期化ハンドシェイク

async def initialize_client(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {
            "roots": {"listChanged": True},
            "sampling": {},
        },
        "clientInfo": {
            "name": "toolsku-mcp-client",
            "version": "1.0.0",
        },
    })
    await client.send_notification("notifications/initialized")
    return result

使用例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])

    server_info = await initialize_client(client)
    print(f"Connected to: {server_info['serverInfo']['name']}")

    tools = await client.send_request("tools/list")
    for tool in tools.get("tools", []):
        print(f"  - {tool['name']}: {tool['description']}")

    result = await client.send_request("tools/call", {
        "name": "search_docs",
        "arguments": {"query": "MCPプロトコル解析"},
    })
    print(f"Result: {result}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 2: SSEトランスポート実装

問題:stdioトランスポートはローカルプロセス通信にのみ適しています。本番環境ではネットワーク経由でリモートMCP Serverに接続する必要があります。SSE(Server-Sent Events)はMCPプロトコルがサポートするリモートトランスポート方式です。

解決策:接続管理、イベント解析、メッセージルーティング、自動再接続を含む完全なSSEトランスポート層を実装します。

SSEトランスポートアーキテクチャ

┌──────────────┐     POST /messages     ┌──────────────┐
│  MCP Client  │ ──────────────────────> │  MCP Server  │
│              │                          │              │
│  SSE Handler │ <──── GET /sse ──────── │  SSE Endpoint│
│              │     EventStream          │              │
└──────────────┘                          └──────────────┘
     │                                          │
     │  1. GET /sse でSSE接続を確立              │
     │  2. endpointイベントを受信しPOST URLを取得 │
     │  3. POST /messages でJSON-RPCリクエスト送信│
     │  4. SSE経由でJSON-RPCレスポンスを受信      │
     └──────────────────────────────────────────┘

完全な実装

import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx

logger = logging.getLogger(__name__)


@dataclass
class SseEvent:
    event: str = "message"
    data: str = ""
    id: str | None = None
    retry: int | None = None


class SseTransport:
    def __init__(
        self,
        server_url: str,
        reconnect_interval: float = 5.0,
        max_reconnect_attempts: int = 10,
        headers: dict[str, str] | None = None,
    ):
        self._server_url = server_url.rstrip("/")
        self._reconnect_interval = reconnect_interval
        self._max_reconnect_attempts = max_reconnect_attempts
        self._headers = headers or {}
        self._message_endpoint: str | None = None
        self._session: httpx.AsyncClient | None = None
        self._running = False
        self._reconnect_count = 0
        self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
        self._last_event_id: str | None = None

    async def connect(self):
        self._session = httpx.AsyncClient(
            headers={**self._headers, "Accept": "text/event-stream"},
            timeout=httpx.Timeout(30.0, read=300.0),
        )
        self._running = True
        asyncio.create_task(self._sse_listener())

    async def _sse_listener(self):
        while self._running:
            try:
                sse_url = f"{self._server_url}/sse"
                params = {}
                if self._last_event_id:
                    params["Last-Event-ID"] = self._last_event_id

                async with self._session.stream("GET", sse_url, params=params) as response:
                    response.raise_for_status()
                    self._reconnect_count = 0

                    current_event = SseEvent()
                    async for line in response.aiter_lines():
                        if not self._running:
                            break

                        if line.startswith(":"):
                            continue

                        if line == "":
                            if current_event.data:
                                await self._dispatch_event(current_event)
                            current_event = SseEvent()
                            continue

                        if ":" in line:
                            field_name, _, field_value = line.partition(":")
                            field_value = field_value.lstrip(" ")

                            if field_name == "event":
                                current_event.event = field_value
                            elif field_name == "data":
                                if current_event.data:
                                    current_event.data += "\n"
                                current_event.data += field_value
                            elif field_name == "id":
                                current_event.id = field_value
                                self._last_event_id = field_value
                            elif field_name == "retry":
                                try:
                                    current_event.retry = int(field_value)
                                except ValueError:
                                    pass

            except httpx.HTTPStatusError as e:
                logger.error(f"SSE HTTP error: {e.response.status_code}")
            except httpx.RequestError as e:
                logger.error(f"SSE connection error: {e}")
            except Exception as e:
                logger.error(f"SSE unexpected error: {e}")

            if self._running:
                self._reconnect_count += 1
                if self._reconnect_count > self._max_reconnect_attempts:
                    logger.error("Max reconnect attempts reached, stopping")
                    self._running = False
                    break
                wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
                logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
                await asyncio.sleep(wait)

    async def _dispatch_event(self, event: SseEvent):
        if event.event == "endpoint":
            self._message_endpoint = f"{self._server_url}{event.data}"
            logger.info(f"Message endpoint: {self._message_endpoint}")

        handlers = self._event_handlers.get(event.event, [])
        for handler in handlers:
            try:
                if event.event == "endpoint":
                    await handler(event.data)
                else:
                    message = json.loads(event.data)
                    await handler(message)
            except Exception as e:
                logger.error(f"Event handler error: {e}")

    async def send_message(self, message: dict[str, Any]) -> None:
        if not self._message_endpoint:
            raise RuntimeError("SSE transport not initialized, endpoint not received")
        if not self._session:
            raise RuntimeError("SSE session not connected")

        response = await self._session.post(
            self._message_endpoint,
            json=message,
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()

    def on_event(self, event_type: str, handler: Callable):
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)

    async def close(self):
        self._running = False
        if self._session:
            await self._session.aclose()

統合使用

class McpSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._request_id = 0
        self._pending: dict[int, asyncio.Future] = {}
        self._transport.on_event("message", self._on_message)

    async def connect(self):
        await self._transport.connect()

    async def _on_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
                else:
                    future.set_result(message.get("result"))

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        self._request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": self._request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[self._request_id] = future
        await self._transport.send_message(request)

        try:
            return await asyncio.wait_for(future, timeout=60.0)
        except asyncio.TimeoutError:
            self._pending.pop(self._request_id, None)
            raise McpTimeoutError(f"Request {method} timed out")

    async def initialize(self) -> dict:
        result = await self.send_request("initialize", {
            "protocolVersion": "2025-03-26",
            "capabilities": {},
            "clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
        })
        await self._transport.send_message({
            "jsonrpc": "2.0",
            "method": "notifications/initialized",
        })
        return result

    async def close(self):
        await self._transport.close()


async def main():
    client = McpSseClient("http://localhost:8080")
    await client.connect()
    info = await client.initialize()
    print(f"Connected: {info}")

    tools = await client.send_request("tools/list")
    print(f"Available tools: {tools}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 3: ツールディスカバリと動的登録

問題:MCP Serverのツールリストは動的です。クライアントはツールを自動発見し、LLM互換のfunction calling形式に変換する必要があり、ツール変更通知もサポートする必要があります。

解決策:ツールディスカバリ、スキーマ変換、動的登録、変更リスニングを実装します。

ツールディスカバリアーキテクチャ

┌──────────────┐  tools/list   ┌──────────────┐
│  MCP Client  │ ────────────> │  MCP Server  │
│              │ <──────────── │              │
│  Tool        │  Tool Schema  │  Tool        │
│  Registry    │               │  Definitions │
│              │  tools/       │              │
│  ┌────────┐  │  list_changed │              │
│  │ OpenAI │  │ <──────────── │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Claude │  │               │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Gemini │  │               │              │
│  │ Format │  │               │              │
│  └────────┘  │               │              │
└──────────────┘               └──────────────┘

完全な実装

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class McpTool:
    name: str
    description: str
    input_schema: dict[str, Any]
    annotations: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_mcp(cls, data: dict) -> "McpTool":
        return cls(
            name=data["name"],
            description=data.get("description", ""),
            input_schema=data.get("inputSchema", {}),
            annotations=data.get("annotations", {}),
        )


class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, McpTool] = {}
        self._change_handlers: list[Callable] = []

    def register(self, tool: McpTool):
        self._tools[tool.name] = tool
        logger.info(f"Registered tool: {tool.name}")
        self._notify_change("registered", tool)

    def unregister(self, name: str):
        if name in self._tools:
            tool = self._tools.pop(name)
            logger.info(f"Unregistered tool: {name}")
            self._notify_change("unregistered", tool)

    def get(self, name: str) -> McpTool | None:
        return self._tools.get(name)

    def list_tools(self) -> list[McpTool]:
        return list(self._tools.values())

    def on_change(self, handler: Callable):
        self._change_handlers.append(handler)

    def _notify_change(self, action: str, tool: McpTool):
        for handler in self._change_handlers:
            try:
                handler(action, tool)
            except Exception as e:
                logger.error(f"Change handler error: {e}")

    def to_openai_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.input_schema,
                },
            })
        return result

    def to_claude_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.input_schema,
            })
        return result

    def to_gemini_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            params = tool.input_schema.get("properties", {})
            required = tool.input_schema.get("required", [])

            declarations = []
            for param_name, param_schema in params.items():
                declarations.append({
                    "name": param_name,
                    "description": param_schema.get("description", ""),
                    "type": self._json_type_to_gemini(param_schema.get("type", "string")),
                    "required": param_name in required,
                })

            result.append({
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "OBJECT",
                    "properties": {d["name"]: {"type": d["type"]} for d in declarations},
                    "required": [d["name"] for d in declarations if d["required"]],
                },
            })
        return result

    @staticmethod
    def _json_type_to_gemini(json_type: str) -> str:
        mapping = {
            "string": "STRING",
            "integer": "INTEGER",
            "number": "NUMBER",
            "boolean": "BOOLEAN",
            "array": "ARRAY",
            "object": "OBJECT",
        }
        return mapping.get(json_type, "STRING")


class ToolDiscovery:
    def __init__(self, client):
        self._client = client
        self._registry = ToolRegistry()
        self._client.on_notification(
            "notifications/tools/list_changed",
            self._on_tools_changed,
        )

    @property
    def registry(self) -> ToolRegistry:
        return self._registry

    async def discover(self) -> list[McpTool]:
        result = await self._client.send_request("tools/list")
        tools = []
        for tool_data in result.get("tools", []):
            tool = McpTool.from_mcp(tool_data)
            self._registry.register(tool)
            tools.append(tool)
        logger.info(f"Discovered {len(tools)} tools")
        return tools

    async def _on_tools_changed(self, params: dict):
        logger.info("Tools list changed, re-discovering...")
        self._registry._tools.clear()
        await self.discover()

    async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
        tool = self._registry.get(name)
        if not tool:
            raise ValueError(f"Tool not found: {name}")

        result = await self._client.send_request("tools/call", {
            "name": name,
            "arguments": arguments,
        })

        contents = result.get("content", [])
        text_parts = []
        for content in contents:
            if content.get("type") == "text":
                text_parts.append(content["text"])
            elif content.get("type") == "image":
                text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
            elif content.get("type") == "resource":
                text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")

        return "\n".join(text_parts) if text_parts else str(result)

動的登録の使用

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    tools = await discovery.discover()

    print(f"Discovered {len(tools)} tools:")
    for tool in tools:
        print(f"  - {tool.name}: {tool.description}")

    openai_tools = discovery.registry.to_openai_tools()
    print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2, ensure_ascii=False)}")

    claude_tools = discovery.registry.to_claude_tools()
    print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2, ensure_ascii=False)}")

    result = await discovery.call_tool("search_docs", {"query": "MCPプロトコル解析"})
    print(f"\nTool result: {result}")

    await client.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Pattern 4: マルチツールシーケンシャルオーケストレーション

問題:AI Agentが複数のツールを順番に呼び出す必要があり、後のツールの入力が前のツールの出力に依存する場合、このようなシーケンシャル依存チェーンをどのようにエレガントに管理するか?

解決策:依存性注入、中間結果の受け渡し、条件分岐をサポートするシーケンシャルオーケストレーターを実装します。

シーケンシャルオーケストレーションフロー

ユーザー入力: "MCPドキュメントを検索して要約を翻訳"
    │
    ▼
┌──────────┐    result     ┌──────────┐    result     ┌──────────┐
│ Tool 1   │ ────────────> │ Tool 2   │ ────────────> │ Tool 3   │
│ 検索     │               │ 要約抽出 │               │ 翻訳     │
└──────────┘               └──────────┘               └──────────┘
    │                          │                          │
    │ $search_result           │ $summary_text            │ $translated
    ▼                          ▼                          ▼
 Context Store            Context Store              Context Store

完全な実装

import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class StepResult:
    step_name: str
    tool_name: str
    input_args: dict[str, Any]
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


@dataclass
class PipelineStep:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    condition: Callable[[dict[str, Any]], bool] | None = None
    timeout: float = 30.0
    retry_count: int = 0

    def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
        if callable(self.arguments):
            return self.arguments(context)
        resolved = {}
        for key, value in self.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


def _deep_get(data: dict, path: str) -> Any:
    keys = path.split(".")
    current = data
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key)
        elif isinstance(current, list) and key.isdigit():
            idx = int(key)
            current = current[idx] if idx < len(current) else None
        else:
            return None
    return current


class SequentialOrchestrator:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery
        self._steps: list[PipelineStep] = []
        self._context: dict[str, Any] = {}

    def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
        self._steps.append(step)
        return self

    def set_context(self, key: str, value: Any):
        self._context[key] = value

    async def execute(self) -> list[StepResult]:
        results = []
        context = dict(self._context)

        for step in self._steps:
            if step.condition and not step.condition(context):
                logger.info(f"Skipping step '{step.name}': condition not met")
                continue

            resolved_args = step.resolve_arguments(context)
            logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= step.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(step.tool_name, resolved_args),
                        timeout=step.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000

                    context[f"steps.{step.name}"] = output
                    context["last_result"] = output

                    results.append(StepResult(
                        step_name=step.name,
                        tool_name=step.tool_name,
                        input_args=resolved_args,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    ))
                    break

                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= step.retry_count:
                        logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
                        await asyncio.sleep(1.0 * attempt)
            else:
                duration = (time.monotonic() - start) * 1000
                results.append(StepResult(
                    step_name=step.name,
                    tool_name=step.tool_name,
                    input_args=resolved_args,
                    output=None,
                    success=False,
                    error=last_error,
                    duration_ms=duration,
                ))
                break

        return results

使用例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = SequentialOrchestrator(discovery)
    orchestrator.set_context("user_query", "MCPプロトコル解析とPythonクライアント統合")

    orchestrator.add_step(PipelineStep(
        name="search",
        tool_name="search_docs",
        arguments={"query": "$user_query", "limit": 5},
    ))

    orchestrator.add_step(PipelineStep(
        name="extract",
        tool_name="extract_summary",
        arguments={"text": "$steps.search"},
        condition=lambda ctx: bool(ctx.get("steps.search")),
    ))

    orchestrator.add_step(PipelineStep(
        name="translate",
        tool_name="translate_text",
        arguments={"text": "$steps.extract", "target_lang": "en"},
        condition=lambda ctx: bool(ctx.get("steps.extract")),
    ))

    results = await orchestrator.execute()

    for r in results:
        status = "OK" if r.success else "FAIL"
        print(f"[{status}] [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
        if r.error:
            print(f"  Error: {r.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 5: マルチツールパラレルオーケストレーション

問題:複数のツール呼び出し間に依存関係がない場合、シーケンシャル実行は時間を無駄にします。複数のMCPツールを安全に並列呼び出しし、結果を集約するには?

解決策:asyncioベースの並行制御、結果集約、DAG依存実行をサポートするパラレルオーケストレーターを実装します。

パラレルオーケストレーションアーキテクチャ

                    ┌──────────┐
                    │  リクエスト│
                    └────┬─────┘
                         │
              ┌──────────┼──────────┐
              │          │          │
              ▼          ▼          ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Tool A   │ │ Tool B   │ │ Tool C   │
        │ 検索     │ │ DB照会   │ │ API呼出  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │             │
             └──────────┬──┘────────────┘
                        │
                        ▼
                 ┌─────────────┐
                 │  結果集約    │
                 └─────────────┘

完全な実装

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class ParallelTask:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    depends_on: list[str] = field(default_factory=list)
    timeout: float = 30.0
    retry_count: int = 0


@dataclass
class TaskResult:
    name: str
    tool_name: str
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


class ParallelOrchestrator:
    def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
        self._discovery = discovery
        self._max_concurrency = max_concurrency
        self._tasks: list[ParallelTask] = []

    def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
        self._tasks.append(task)
        return self

    async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
        context = dict(initial_context or {})
        results: dict[str, TaskResult] = {}
        semaphore = asyncio.Semaphore(self._max_concurrency)
        completed = set()
        remaining = list(self._tasks)

        while remaining:
            ready = [
                t for t in remaining
                if all(dep in completed for dep in t.depends_on)
            ]

            if not ready and remaining:
                logger.error("Circular dependency detected")
                break

            coros = [self._execute_task(task, context, results, semaphore) for task in ready]
            task_results = await asyncio.gather(*coros, return_exceptions=True)

            for task, result in zip(ready, task_results):
                if isinstance(result, Exception):
                    results[task.name] = TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=None,
                        success=False,
                        error=str(result),
                    )
                else:
                    results[task.name] = result
                    context[f"tasks.{task.name}"] = result.output

                completed.add(task.name)
                remaining.remove(task)

        return results

    async def _execute_task(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
        semaphore: asyncio.Semaphore,
    ) -> TaskResult:
        async with semaphore:
            resolved_args = self._resolve_arguments(task, context, results)

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= task.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(task.tool_name, resolved_args),
                        timeout=task.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000
                    return TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    )
                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= task.retry_count:
                        await asyncio.sleep(0.5 * attempt)

            duration = (time.monotonic() - start) * 1000
            return TaskResult(
                name=task.name,
                tool_name=task.tool_name,
                output=None,
                success=False,
                error=last_error,
                duration_ms=duration,
            )

    def _resolve_arguments(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
    ) -> dict[str, Any]:
        if callable(task.arguments):
            return task.arguments(context)

        resolved = {}
        for key, value in task.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                if ref_path.startswith("tasks."):
                    task_name = ref_path.split(".")[1]
                    if task_name in results:
                        resolved[key] = results[task_name].output
                else:
                    resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


class DagOrchestrator(ParallelOrchestrator):
    def validate_dag(self) -> list[str]:
        errors = []
        task_names = {t.name for t in self._tasks}
        for task in self._tasks:
            for dep in task.depends_on:
                if dep not in task_names:
                    errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")

        visited = set()
        path = set()

        def has_cycle(name: str) -> bool:
            visited.add(name)
            path.add(name)
            task = next((t for t in self._tasks if t.name == name), None)
            if task:
                for dep in task.depends_on:
                    if dep in path:
                        return True
                    if dep not in visited and has_cycle(dep):
                        return True
            path.remove(name)
            return False

        for task in self._tasks:
            if task.name not in visited:
                if has_cycle(task.name):
                    errors.append("Circular dependency detected in DAG")
                    break

        return errors

使用例

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)

    orchestrator.add_task(ParallelTask(
        name="search_web",
        tool_name="web_search",
        arguments={"query": "MCPプロトコル 2026"},
    ))

    orchestrator.add_task(ParallelTask(
        name="search_docs",
        tool_name="search_docs",
        arguments={"query": "Model Context Protocol Python"},
    ))

    orchestrator.add_task(ParallelTask(
        name="query_db",
        tool_name="query_database",
        arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
    ))

    orchestrator.add_task(ParallelTask(
        name="merge_results",
        tool_name="merge_and_deduplicate",
        arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
        depends_on=["search_web", "search_docs", "query_db"],
    ))

    results = await orchestrator.execute({"user_query": "MCPクライアント統合"})

    for name, result in results.items():
        status = "OK" if result.success else "FAIL"
        print(f"[{status}] [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
        if result.error:
            print(f"  Error: {result.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 6: エラー処理とリトライ

問題:MCPクライアントは、ネットワーク不安定、サーバー過負荷、ツール実行例外時に堅牢なエラー処理とリトライメカニズムが必要です。

解決策:指数バックオフリトライ、サーキットブレーカー、フォールバック戦略による階層型エラー処理を実装します。

エラー処理アーキテクチャ

┌────────────────────────────────────────────┐
│              Error Handling Layer           │
│                                            │
│  ┌──────────┐  ┌──────────┐  ┌─────────┐  │
│  │ Retry    │  │ Circuit  │  │ Fallback│  │
│  │ Strategy │  │ Breaker  │  │ Handler │  │
│  └──────────┘  └──────────┘  └─────────┘  │
│                                            │
│  ┌──────────────────────────────────────┐  │
│  │        Error Classification          │  │
│  │  Transient / Permanent / Unknown     │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘

完全な実装

import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ErrorCategory(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    UNKNOWN = "unknown"


@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_errors: list[int] = field(default_factory=lambda: [
        -32603,  # Internal error
        -32000,  # Server error
    ])


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig | None = None):
        self._config = config or CircuitBreakerConfig()
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = 0
        self._half_open_calls = 0

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
        return self._state

    def allow_request(self) -> bool:
        state = self.state
        if state == CircuitState.CLOSED:
            return True
        if state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self._config.half_open_max_calls:
                self._half_open_calls += 1
                return True
            return False
        return False

    def record_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self._config.half_open_max_calls:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
                self._success_count = 0
        else:
            self._failure_count = 0

    def record_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.OPEN
            self._success_count = 0
        elif self._failure_count >= self._config.failure_threshold:
            self._state = CircuitState.OPEN


class ResilientMcpClient:
    def __init__(
        self,
        client,
        retry_config: RetryConfig | None = None,
        circuit_config: CircuitBreakerConfig | None = None,
    ):
        self._client = client
        self._retry_config = retry_config or RetryConfig()
        self._circuit_breakers: dict[str, CircuitBreaker] = {}
        self._circuit_config = circuit_config or CircuitBreakerConfig()
        self._fallback_handlers: dict[str, Callable] = {}

    def _get_breaker(self, server_name: str) -> CircuitBreaker:
        if server_name not in self._circuit_breakers:
            self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
        return self._circuit_breakers[server_name]

    def register_fallback(self, tool_name: str, handler: Callable):
        self._fallback_handlers[tool_name] = handler

    def classify_error(self, error: Exception) -> ErrorCategory:
        if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
            return ErrorCategory.TRANSIENT
        if isinstance(error, McpError):
            if error.code in self._retry_config.retryable_errors:
                return ErrorCategory.TRANSIENT
            if error.code in [-32600, -32601, -32602]:
                return ErrorCategory.PERMANENT
        return ErrorCategory.UNKNOWN

    async def call_with_retry(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        server_name: str = "default",
    ) -> Any:
        breaker = self._get_breaker(server_name)

        if not breaker.allow_request():
            fallback = self._fallback_handlers.get(tool_name)
            if fallback:
                logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
                return await fallback(arguments)
            raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")

        last_error = None
        for attempt in range(1, self._retry_config.max_attempts + 1):
            try:
                result = await self._client.call_tool(tool_name, arguments)
                breaker.record_success()
                return result
            except Exception as e:
                last_error = e
                category = self.classify_error(e)

                if category == ErrorCategory.PERMANENT:
                    breaker.record_failure()
                    raise

                if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
                    delay = min(
                        self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
                        self._retry_config.max_delay,
                    )
                    if self._retry_config.jitter:
                        import random
                        delay *= (0.5 + random.random() * 0.5)

                    logger.warning(
                        f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
                        f"retrying in {delay:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay)
                else:
                    breaker.record_failure()

        if fallback := self._fallback_handlers.get(tool_name):
            logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
            return await fallback(arguments)

        raise last_error


class CircuitOpenError(Exception):
    pass

使用例

async def fallback_search(arguments: dict) -> str:
    return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})


async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    resilient = ResilientMcpClient(
        discovery,
        retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
        circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
    )
    resilient.register_fallback("search_docs", fallback_search)

    try:
        result = await resilient.call_with_retry("search_docs", {"query": "MCPプロトコル解析"})
        print(f"Result: {result}")
    except CircuitOpenError as e:
        print(f"Service unavailable: {e}")
    except Exception as e:
        print(f"Failed: {e}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 7: ストリーミングレスポンス処理

問題:長時間実行されるツール呼び出しにはリアルタイムの進捗フィードバックが必要です。MCPプロトコルはSSEとNotificationによるストリーミングレスポンスをサポートしています。

解決策:進捗通知、部分結果集約、キャンセル操作をサポートするストリーミングレスポンスハンドラーを実装します。

ストリーミングレスポンスアーキテクチャ

┌──────────┐  tools/call   ┌──────────┐
│  Client  │ ────────────> │  Server  │
│          │               │          │
│  Stream  │ <── notify ── │  Tool    │
│  Handler │   progress    │  Running │
│          │ <── notify ── │          │
│          │   progress    │          │
│          │ <── result ── │          │
│          │   complete    │          │
└──────────┘               └──────────┘

完全な実装

import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum

logger = logging.getLogger(__name__)


class StreamEventType(Enum):
    PROGRESS = "progress"
    PARTIAL_RESULT = "partial_result"
    COMPLETE = "complete"
    ERROR = "error"
    LOG = "log"


@dataclass
class StreamEvent:
    type: StreamEventType
    data: Any
    progress: float | None = None
    message: str | None = None
    timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())


@dataclass
class StreamResult:
    events: list[StreamEvent] = field(default_factory=list)
    final_result: Any = None
    success: bool = True
    error: str | None = None
    total_duration_ms: float = 0.0


class StreamHandler:
    def __init__(self):
        self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
        self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
        self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
        self._error_handlers: list[Callable[[StreamEvent], Any]] = []
        self._log_handlers: list[Callable[[StreamEvent], Any]] = []

    def on_progress(self, handler: Callable[[StreamEvent], Any]):
        self._progress_handlers.append(handler)

    def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
        self._partial_handlers.append(handler)

    def on_complete(self, handler: Callable[[StreamEvent], Any]):
        self._complete_handlers.append(handler)

    def on_error(self, handler: Callable[[StreamEvent], Any]):
        self._error_handlers.append(handler)

    def on_log(self, handler: Callable[[StreamEvent], Any]):
        self._log_handlers.append(handler)

    async def handle_notification(self, params: dict[str, Any]):
        method = params.get("method", "")

        if method == "notifications/progress":
            progress_data = params.get("progress", {})
            event = StreamEvent(
                type=StreamEventType.PROGRESS,
                data=progress_data,
                progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
                message=progress_data.get("message"),
            )
            for handler in self._progress_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Progress handler error: {e}")

        elif method == "notifications/message":
            level = params.get("level", "info")
            event = StreamEvent(
                type=StreamEventType.LOG,
                data=params.get("data"),
                message=f"[{level}] {params.get('data', {}).get('text', '')}",
            )
            for handler in self._log_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Log handler error: {e}")


class StreamingMcpClient:
    def __init__(self, client, discovery: ToolDiscovery):
        self._client = client
        self._discovery = discovery
        self._stream_handler = StreamHandler()
        self._active_calls: dict[str, asyncio.Task] = {}

        client.on_notification(
            "notifications/progress",
            self._stream_handler.handle_notification,
        )
        client.on_notification(
            "notifications/message",
            self._stream_handler.handle_notification,
        )

    @property
    def stream(self) -> StreamHandler:
        return self._stream_handler

    async def call_with_stream(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        timeout: float = 120.0,
    ) -> StreamResult:
        import time
        start = time.monotonic()
        result = StreamResult()

        self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
        self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))

        try:
            call_result = await asyncio.wait_for(
                self._discovery.call_tool(tool_name, arguments),
                timeout=timeout,
            )
            result.final_result = call_result
            result.success = True
            result.events.append(StreamEvent(
                type=StreamEventType.COMPLETE,
                data=call_result,
            ))
        except asyncio.TimeoutError:
            result.success = False
            result.error = f"Tool call timed out after {timeout}s"
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=result.error,
            ))
        except Exception as e:
            result.success = False
            result.error = str(e)
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=str(e),
            ))

        result.total_duration_ms = (time.monotonic() - start) * 1000
        return result

    async def cancel(self, request_id: int | str):
        await self._client.send_notification("notifications/cancelled", {
            "requestId": request_id,
            "reason": "User cancelled",
        })

使用例

async def main():
    client = JsonRpcClient(timeout=120.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    streaming = StreamingMcpClient(client, discovery)

    streaming.stream.on_progress(lambda e: print(f"  Progress: {e.progress:.0%} - {e.message or ''}"))
    streaming.stream.on_log(lambda e: print(f"  Log: {e.message}"))
    streaming.stream.on_complete(lambda e: print(f"  Complete!"))
    streaming.stream.on_error(lambda e: print(f"  Error: {e.message}"))

    print("Calling long-running tool with streaming...")
    result = await streaming.call_with_stream(
        "batch_process",
        {"items": list(range(100)), "operation": "analyze"},
        timeout=120.0,
    )

    print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
    if result.final_result:
        print(f"Output: {str(result.final_result)[:200]}...")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

5つのよくある落とし穴と解決策

1. 初期化ハンドシェイクでのinitialized通知の漏れ

落とし穴initializeリクエスト送信後、notifications/initialized通知の送信を忘れる。Serverはハンドシェイク完了を待ち続け、後続のリクエストがすべてタイムアウトする。

解決策

async def safe_initialize(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {},
        "clientInfo": {"name": "my-client", "version": "1.0.0"},
    })
    # initialized通知を必ず送信
    await client.send_notification("notifications/initialized")
    return result

2. SSEエンドポイント準備前のリクエスト送信

落とし穴:SSE接続確立後、endpointイベントを受信する前にJSON-RPCリクエストを即座に送信し、間違ったURLにリクエストが送られる。

解決策

class SafeSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._endpoint_ready = asyncio.Event()
        self._transport.on_event("endpoint", self._on_endpoint)

    async def _on_endpoint(self, data):
        self._endpoint_ready.set()

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
        # これで安全にリクエスト送信可能
        ...

3. ツールスキーマ変換でのrequiredフィールドの欠落

落とし穴:MCPツールスキーマをOpenAI function calling形式に変換する際、inputSchemaをそのままparametersとして使用するが、一部のLLMはrequiredフィールドの存在を要求する。

解決策

def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
    schema = dict(mcp_tool.input_schema)
    if "required" not in schema:
        schema["required"] = []
    if "type" not in schema:
        schema["type"] = "object"
    return {
        "type": "function",
        "function": {
            "name": mcp_tool.name,
            "description": mcp_tool.description,
            "parameters": schema,
        },
    }

4. パラレル呼び出し時の共有状態の競合

落とし穴:複数の並列ツール呼び出しが同じhttpx.AsyncClientasyncio.Future辞書を共有し、レスポンスの誤一致が発生する。

解決策:各並列呼び出しで独立したリクエストID空間を使用するか、接続プールの分離を行う:

class IsolatedClientPool:
    def __init__(self, base_url: str, pool_size: int = 5):
        self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
        self._index = 0
        self._lock = asyncio.Lock()

    async def get_client(self) -> McpSseClient:
        async with self._lock:
            client = self._clients[self._index % len(self._clients)]
            self._index += 1
            return client

5. ストリーミングレスポンスでの進捗通知と最終結果の混同

落とし穴:SSEチャネルで進捗通知と最終結果が同じストリームを共有し、クライアントが進捗通知を最終結果として処理してしまう。

解決策:メッセージタイプを厳密に区別し、idフィールドを持つレスポンスのみをリクエスト結果として処理する:

async def _handle_sse_message(self, message: dict):
    if "id" in message:
        # これはリクエストレスポンス、保留中のfutureと照合
        future = self._pending.pop(message["id"], None)
        if future and not future.done():
            future.set_result(message.get("result"))
    elif "method" in message:
        # これは通知、ストリームハンドラーに転送
        await self._stream_handler.handle_notification(message)

10のよくあるエラートラブルシューティング

エラーメッセージ 原因 解決策
MCP Error [-32600]: Invalid Request JSON-RPCメッセージ形式エラー、必須フィールドの欠落 リクエストにjsonrpcmethodidフィールドが含まれているか確認。/ja/json/formatで形式を検証
MCP Error [-32601]: Method not found サーバーがサポートしていないメソッドの呼び出し 先にinitializeでサーバーの機能を確認。メソッド名のスペルチェック
MCP Error [-32602]: Invalid params ツールパラメータの型や構造の不一致 tools/listで正しいinputSchemaを取得し、パラメータを照合
Connection refused MCP Serverが起動していない、またはポートが間違っている Serverプロセスの実行を確認。ポートとURL設定をチェック
SSE connection timeout ネットワークが到達不能、またはServerがSSEハンドシェイクに応答しない ファイアウォールルールを確認。SSEエンドポイントの到達可能性を確認
Request timed out after 30s ツール実行がクライアントのタイムアウト設定を超過 タイムアウトパラメータを増やすか、ストリーミングレスポンスを使用
Circuit breaker open 連続失敗回数がサーキットブレーカーの閾値を超過 Serverの健全性を確認。リカバリタイムアウト後にリトライ
Tool not found: xxx 未登録のツール名の呼び出し tools/listを再実行してディスカバリ。ツール名の大文字小文字を確認
Endpoint not received SSE接続後にendpointイベントを受信していない Serverが正しくendpointイベントを送信しているか確認。SSEルート設定をチェック
JSON decode error in SSE stream SSEデータが有効なJSONではない Server側のSSEイベント形式を確認。dataフィールドが有効なJSONであることを確認

高度な最適化テクニック

1. 接続プールとマルチServer管理

本番環境では、AI Agentは通常複数のMCP Serverに同時接続する必要があります。接続プールを使用して複数のクライアントインスタンスを管理します:

class McpConnectionPool:
    def __init__(self):
        self._connections: dict[str, Any] = {}
        self._discovery_map: dict[str, ToolDiscovery] = {}

    async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
        client = JsonRpcClient(timeout=30.0)
        await client.connect_stdio(command, args)
        await initialize_client(client)
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    async def connect_sse(self, name: str, url: str):
        client = McpSseClient(url)
        await client.connect()
        await client.initialize()
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    def get_discovery(self, name: str) -> ToolDiscovery | None:
        return self._discovery_map.get(name)

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        for name, discovery in self._discovery_map.items():
            if discovery.registry.get(tool_name):
                return await discovery.call_tool(tool_name, arguments)
        raise ValueError(f"Tool '{tool_name}' not found in any connected server")

    async def close_all(self):
        for client in self._connections.values():
            await client.close()

2. ツール呼び出しキャッシュ

冪等なツール呼び出し結果をキャッシュし、重複リクエストを削減します:

import hashlib
import json
from typing import Any


class ToolCallCache:
    def __init__(self, ttl: float = 300.0, max_size: int = 1000):
        self._cache: dict[str, tuple[float, Any]] = {}
        self._ttl = ttl
        self._max_size = max_size

    def _make_key(self, tool_name: str, arguments: dict) -> str:
        raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()

    def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
        key = self._make_key(tool_name, arguments)
        if key in self._cache:
            ts, value = self._cache[key]
            if asyncio.get_event_loop().time() - ts < self._ttl:
                return True, value
            del self._cache[key]
        return False, None

    def set(self, tool_name: str, arguments: dict, value: Any):
        if len(self._cache) >= self._max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]
        key = self._make_key(tool_name, arguments)
        self._cache[key] = (asyncio.get_event_loop().time(), value)

    def invalidate(self, tool_name: str, arguments: dict | None = None):
        if arguments:
            key = self._make_key(tool_name, arguments)
            self._cache.pop(key, None)
        else:
            prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
            keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
            for k in keys_to_remove:
                del self._cache[k]

3. オブザーバビリティとトレーシング

MCPクライアント呼び出しにOpenTelemetryトレーシングを追加します:

from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")

tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")


class ObservableMcpClient:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
            span.set_attribute("mcp.tool.name", tool_name)
            span.set_attribute("mcp.tool.args_count", len(arguments))

            import time
            start = time.monotonic()
            try:
                result = await self._discovery.call_tool(tool_name, arguments)
                duration = time.monotonic() - start

                span.set_status(Status(StatusCode.OK))
                tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
                tool_call_duration.record(duration, {"tool": tool_name})

                return result
            except Exception as e:
                duration = time.monotonic() - start
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
                tool_call_duration.record(duration, {"tool": tool_name})
                raise

比較分析:3つのオーケストレーションアプローチ

次元 シーケンシャル パラレル DAG
実行方式 順次実行、前のステップ完了後に次を実行 全タスクを同時実行 依存関係に基づくトポロジカルソートで実行
適用シナリオ ツール間に厳密なデータ依存がある場合 ツール間が完全に独立している場合 一部依存、一部独立
レイテンシ 全ツール所要時間の合計 最も遅いツールの所要時間 クリティカルパス上の所要時間の合計
エラー伝播 前のステップの失敗で後続を全スキップ 単一の失敗は他に影響しない 失敗ノードの下流をスキップ
実装複雑度
リソース消費 低(同時に1つの接続のみ使用) 高(並行接続プールが必要) 中(レイヤーごとの並行)
結果の一貫性 強(決定的な順序) 弱(非決定的な完了順序) 中(同じレイヤー内は非決定的)
タイムアウト制御 ステップごとに独立したタイムアウト グローバル+タスクごとのタイムアウト ステップごと+グローバルタイムアウト
典型的シナリオ 検索→抽出→翻訳 3つのデータソースを同時検索 検索A+B→マージ→翻訳

選択ガイド

  • ツール間に厳密な因果依存がある → シーケンシャルオーケストレーション
  • ツール間が完全に独立 → パラレルオーケストレーション
  • ツール間に部分的な依存がある → DAGオーケストレーション

AI Agentワークフローオーケストレーションの詳細については、AI AgentワークフローDAGオーケストレーションAI Agentプロトコル比較 をご覧ください。


オンラインツール推奨

  • JSONフォーマッター:MCPプロトコルメッセージのデバッグ時に、/ja/json/format でJSON-RPCリクエストとレスポンスをフォーマット
  • cURL to Code:MCP Serverエンドポイントのテスト時に、/ja/dev/curl-to-code でPythonクライアントコードを素早く生成
  • Base64エンコード/デコード:MCPトランスポートのバイナリデータ処理時に、/ja/encode/base64 でエンコード/デコード

まとめ:Python MCPクライアント統合は、単にJSON-RPCリクエストを送信するだけではありません。基本的なstdio接続からSSEリモートトランスポートまで、ツールディスカバリと動的登録からマルチツールのシーケンシャル/パラレル/DAGオーケストレーションまで、サーキットブレーカーとフォールバックによるエラー処理からストリーミングレスポンスと進捗追跡まで—あらゆる側面で本番レベルのエンジニアリングが求められます。この7つのパターンをマスターすれば、AI Agentは信頼性が高く、効率的で、観測可能なツール呼び出し能力を持つことになります。覚えておいてください:MCPプロトコル解析は基盤、マルチツールオーケストレーションは核心、エラー処理は保障、ストリーミングレスポンスは体験です。最新のプロトコル定義については、公式の Model Context Protocol Specification を参照してください。

ブラウザローカルツールを無料で試す →

#Python#MCP#Model Context Protocol#AI工具编排#函数调用#多工具集成#2026#AI与大数据