Python MCP Client Integration: 7 Production Patterns from Protocol Parsing to Multi-Tool Orchestration

AI与大数据

Why Your AI App Can Never Connect to External Tools: 5 Pain Points of MCP Client Integration

You spent three days building a perfect MCP Server, but Claude Desktop won't connect? Your AI Agent calls three tools, and when the first one times out, the rest all fail? Tool discovery returns a bunch of schemas, but you don't know how to convert them to LLM-compatible function calling format? SSE connections keep dropping and reconnecting, losing data without you knowing? Multi-tool orchestration is either too slow sequentially or too chaotic in parallel?

The root cause: most developers focus only on MCP Server development, ignoring the complexity of Client-side integration. While the MCP protocol defines standard communication, the client must handle JSON-RPC message construction, transport adaptation, tool discovery and conversion, orchestration strategy, error recovery, and a whole suite of engineering challenges.

Key Takeaways:

  • Master MCP protocol JSON-RPC 2.0 communication for reliable client-server connections
  • Learn tool discovery and dynamic registration for AI Agent auto-adaptation
  • Understand 7 production patterns for multi-tool orchestration, from sequential calls to parallel DAG execution
  • Build robust error handling and retry mechanisms for network jitter and service unavailability
  • Master SSE streaming response processing for real-time tool call progress feedback

Table of Contents


MCP Protocol Core Concepts

Before diving into the 7 production patterns, we need to understand where the MCP client sits in the protocol stack and its core concepts.

MCP Client Architecture Overview

┌─────────────────────────────────────────────────┐
│                  AI Application                  │
│  (Claude Desktop / ChatGPT / Custom Agent)      │
├─────────────────────────────────────────────────┤
│               MCP Client Layer                   │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ JSON-RPC  │ │Transport │ │ Tool Registry  │  │
│  │ Handler   │ │ Adapter  │ │ & Discovery    │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
│  ┌───────────┐ ┌──────────┐ ┌────────────────┐  │
│  │ Orchest-  │ │ Error &  │ │ Stream         │  │
│  │ ration    │ │ Retry    │ │ Handler        │  │
│  └───────────┘ └──────────┘ └────────────────┘  │
├─────────────────────────────────────────────────┤
│              Transport Layer                     │
│     stdio / SSE / Streamable HTTP               │
├─────────────────────────────────────────────────┤
│              MCP Server                          │
│  Tools / Resources / Prompts                    │
└─────────────────────────────────────────────────┘

Core Concepts Reference

Concept Description JSON-RPC Method Analogy
Initialize Client-server handshake initialize TCP 3-way handshake
Tool Discovery Get list of tools server supports tools/list DNS lookup
Tool Call Call a specific tool and get results tools/call HTTP POST
Resource Read Read resources exposed by server resources/read HTTP GET
Prompt Get Get predefined prompt templates prompts/get Template engine
Notification One-way notification, no response needed No id field UDP broadcast
Cancel Cancel an in-progress request notifications/cancelled HTTP cancellation

JSON-RPC 2.0 Message Format

MCP is based on JSON-RPC 2.0. Understanding message format is foundational for client development:

// Request message
{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "search_docs",
    "arguments": {"query": "MCP protocol"}
  },
  "id": 1
}

// Success response
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {"type": "text", "text": "Search results..."}
    ]
  },
  "id": 1
}

// Error response
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32600,
    "message": "Invalid Request"
  },
  "id": 1
}

If you're interested in MCP Server development, check out our previous article: Python MCP Server Development Guide. For deeper understanding of AI function calling protocols, see AI Function Calling in Production.


Pattern 1: Basic JSON-RPC Client

Problem: Your AI application needs to connect to an MCP Server, but you don't know how to construct proper JSON-RPC messages, manage request IDs, or handle responses.

Solution: Build a complete JSON-RPC 2.0 client that encapsulates message construction, sending, response matching, and timeout handling.

Complete Implementation

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable


@dataclass
class JsonRpcRequest:
    jsonrpc: str = "2.0"
    method: str = ""
    params: dict[str, Any] = field(default_factory=dict)
    id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)


@dataclass
class JsonRpcResponse:
    id: int | str
    result: Any = None
    error: dict[str, Any] | None = None


class JsonRpcClient:
    def __init__(self, timeout: float = 30.0):
        self._request_id = 0
        self._pending: dict[int | str, asyncio.Future] = {}
        self._timeout = timeout
        self._notification_handlers: dict[str, Callable] = {}
        self._writer: asyncio.StreamWriter | None = None
        self._reader: asyncio.StreamReader | None = None

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
        import os
        proc_env = {**os.environ, **(env or {})}
        self._process = await asyncio.create_subprocess_exec(
            command,
            *(args or []),
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=proc_env,
        )
        self._reader = self._process.stdout
        self._writer = self._process.stdin
        asyncio.create_task(self._read_loop())

    async def _read_loop(self):
        while self._reader and not self._reader.at_eof():
            line = await self._reader.readline()
            if not line:
                continue
            try:
                message = json.loads(line.decode().strip())
                await self._handle_message(message)
            except json.JSONDecodeError:
                continue

    async def _handle_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(
                        McpError(message["error"]["code"], message["error"]["message"])
                    )
                else:
                    future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
        elif "method" in message:
            handler = self._notification_handlers.get(message["method"])
            if handler:
                await handler(message.get("params", {}))

    async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
        request_id = self._next_id()
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[request_id] = future

        data = json.dumps(request) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

        try:
            response = await asyncio.wait_for(future, timeout=self._timeout)
            return response.result
        except asyncio.TimeoutError:
            self._pending.pop(request_id, None)
            raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")

    async def send_notification(self, method: str, params: dict[str, Any] | None = None):
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        data = json.dumps(notification) + "\n"
        if self._writer:
            self._writer.write(data.encode())
            await self._writer.drain()

    def on_notification(self, method: str, handler: Callable):
        self._notification_handlers[method] = handler

    async def close(self):
        if self._writer:
            self._writer.close()
        if hasattr(self, "_process") and self._process:
            self._process.terminate()
            await self._process.wait()


class McpError(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(f"MCP Error [{code}]: {message}")


class McpTimeoutError(Exception):
    pass

Initialization Handshake

async def initialize_client(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {
            "roots": {"listChanged": True},
            "sampling": {},
        },
        "clientInfo": {
            "name": "toolsku-mcp-client",
            "version": "1.0.0",
        },
    })
    await client.send_notification("notifications/initialized")
    return result

Usage Example

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])

    server_info = await initialize_client(client)
    print(f"Connected to: {server_info['serverInfo']['name']}")

    tools = await client.send_request("tools/list")
    for tool in tools.get("tools", []):
        print(f"  - {tool['name']}: {tool['description']}")

    result = await client.send_request("tools/call", {
        "name": "search_docs",
        "arguments": {"query": "MCP protocol parsing"},
    })
    print(f"Result: {result}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 2: SSE Transport Implementation

Problem: stdio transport only works for local process communication. Production environments need network-based connections to remote MCP Servers. SSE (Server-Sent Events) is the remote transport method supported by MCP.

Solution: Implement a complete SSE transport layer including connection management, event parsing, message routing, and auto-reconnection.

SSE Transport Architecture

┌──────────────┐     POST /messages     ┌──────────────┐
│  MCP Client  │ ──────────────────────> │  MCP Server  │
│              │                          │              │
│  SSE Handler │ <──── GET /sse ──────── │  SSE Endpoint│
│              │     EventStream          │              │
└──────────────┘                          └──────────────┘
     │                                          │
     │  1. GET /sse to establish SSE connection  │
     │  2. Receive endpoint event for POST URL   │
     │  3. POST /messages to send JSON-RPC reqs  │
     │  4. Receive JSON-RPC responses via SSE    │
     └──────────────────────────────────────────┘

Complete Implementation

import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx

logger = logging.getLogger(__name__)


@dataclass
class SseEvent:
    event: str = "message"
    data: str = ""
    id: str | None = None
    retry: int | None = None


class SseTransport:
    def __init__(
        self,
        server_url: str,
        reconnect_interval: float = 5.0,
        max_reconnect_attempts: int = 10,
        headers: dict[str, str] | None = None,
    ):
        self._server_url = server_url.rstrip("/")
        self._reconnect_interval = reconnect_interval
        self._max_reconnect_attempts = max_reconnect_attempts
        self._headers = headers or {}
        self._message_endpoint: str | None = None
        self._session: httpx.AsyncClient | None = None
        self._running = False
        self._reconnect_count = 0
        self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
        self._last_event_id: str | None = None

    async def connect(self):
        self._session = httpx.AsyncClient(
            headers={**self._headers, "Accept": "text/event-stream"},
            timeout=httpx.Timeout(30.0, read=300.0),
        )
        self._running = True
        asyncio.create_task(self._sse_listener())

    async def _sse_listener(self):
        while self._running:
            try:
                sse_url = f"{self._server_url}/sse"
                params = {}
                if self._last_event_id:
                    params["Last-Event-ID"] = self._last_event_id

                async with self._session.stream("GET", sse_url, params=params) as response:
                    response.raise_for_status()
                    self._reconnect_count = 0

                    current_event = SseEvent()
                    async for line in response.aiter_lines():
                        if not self._running:
                            break

                        if line.startswith(":"):
                            continue

                        if line == "":
                            if current_event.data:
                                await self._dispatch_event(current_event)
                            current_event = SseEvent()
                            continue

                        if ":" in line:
                            field_name, _, field_value = line.partition(":")
                            field_value = field_value.lstrip(" ")

                            if field_name == "event":
                                current_event.event = field_value
                            elif field_name == "data":
                                if current_event.data:
                                    current_event.data += "\n"
                                current_event.data += field_value
                            elif field_name == "id":
                                current_event.id = field_value
                                self._last_event_id = field_value
                            elif field_name == "retry":
                                try:
                                    current_event.retry = int(field_value)
                                except ValueError:
                                    pass

            except httpx.HTTPStatusError as e:
                logger.error(f"SSE HTTP error: {e.response.status_code}")
            except httpx.RequestError as e:
                logger.error(f"SSE connection error: {e}")
            except Exception as e:
                logger.error(f"SSE unexpected error: {e}")

            if self._running:
                self._reconnect_count += 1
                if self._reconnect_count > self._max_reconnect_attempts:
                    logger.error("Max reconnect attempts reached, stopping")
                    self._running = False
                    break
                wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
                logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
                await asyncio.sleep(wait)

    async def _dispatch_event(self, event: SseEvent):
        if event.event == "endpoint":
            self._message_endpoint = f"{self._server_url}{event.data}"
            logger.info(f"Message endpoint: {self._message_endpoint}")

        handlers = self._event_handlers.get(event.event, [])
        for handler in handlers:
            try:
                if event.event == "endpoint":
                    await handler(event.data)
                else:
                    message = json.loads(event.data)
                    await handler(message)
            except Exception as e:
                logger.error(f"Event handler error: {e}")

    async def send_message(self, message: dict[str, Any]) -> None:
        if not self._message_endpoint:
            raise RuntimeError("SSE transport not initialized, endpoint not received")
        if not self._session:
            raise RuntimeError("SSE session not connected")

        response = await self._session.post(
            self._message_endpoint,
            json=message,
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()

    def on_event(self, event_type: str, handler: Callable):
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)

    async def close(self):
        self._running = False
        if self._session:
            await self._session.aclose()

Integration Usage

class McpSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._request_id = 0
        self._pending: dict[int, asyncio.Future] = {}
        self._transport.on_event("message", self._on_message)

    async def connect(self):
        await self._transport.connect()

    async def _on_message(self, message: dict):
        if "id" in message:
            future = self._pending.pop(message["id"], None)
            if future and not future.done():
                if "error" in message:
                    future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
                else:
                    future.set_result(message.get("result"))

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        self._request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": self._request_id,
        }
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending[self._request_id] = future
        await self._transport.send_message(request)

        try:
            return await asyncio.wait_for(future, timeout=60.0)
        except asyncio.TimeoutError:
            self._pending.pop(self._request_id, None)
            raise McpTimeoutError(f"Request {method} timed out")

    async def initialize(self) -> dict:
        result = await self.send_request("initialize", {
            "protocolVersion": "2025-03-26",
            "capabilities": {},
            "clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
        })
        await self._transport.send_message({
            "jsonrpc": "2.0",
            "method": "notifications/initialized",
        })
        return result

    async def close(self):
        await self._transport.close()


async def main():
    client = McpSseClient("http://localhost:8080")
    await client.connect()
    info = await client.initialize()
    print(f"Connected: {info}")

    tools = await client.send_request("tools/list")
    print(f"Available tools: {tools}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 3: Tool Discovery and Dynamic Registration

Problem: MCP Server tool lists are dynamic. Clients need to auto-discover tools and convert them to LLM-compatible function calling formats, while supporting tool change notifications.

Solution: Implement tool discovery, schema conversion, dynamic registration, and change listening.

Tool Discovery Architecture

┌──────────────┐  tools/list   ┌──────────────┐
│  MCP Client  │ ────────────> │  MCP Server  │
│              │ <──────────── │              │
│  Tool        │  Tool Schema  │  Tool        │
│  Registry    │               │  Definitions │
│              │  tools/       │              │
│  ┌────────┐  │  list_changed │              │
│  │ OpenAI │  │ <──────────── │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Claude │  │               │              │
│  │ Format │  │               │              │
│  ├────────┤  │               │              │
│  │ Gemini │  │               │              │
│  │ Format │  │               │              │
│  └────────┘  │               │              │
└──────────────┘               └──────────────┘

Complete Implementation

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class McpTool:
    name: str
    description: str
    input_schema: dict[str, Any]
    annotations: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_mcp(cls, data: dict) -> "McpTool":
        return cls(
            name=data["name"],
            description=data.get("description", ""),
            input_schema=data.get("inputSchema", {}),
            annotations=data.get("annotations", {}),
        )


class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, McpTool] = {}
        self._change_handlers: list[Callable] = []

    def register(self, tool: McpTool):
        self._tools[tool.name] = tool
        logger.info(f"Registered tool: {tool.name}")
        self._notify_change("registered", tool)

    def unregister(self, name: str):
        if name in self._tools:
            tool = self._tools.pop(name)
            logger.info(f"Unregistered tool: {name}")
            self._notify_change("unregistered", tool)

    def get(self, name: str) -> McpTool | None:
        return self._tools.get(name)

    def list_tools(self) -> list[McpTool]:
        return list(self._tools.values())

    def on_change(self, handler: Callable):
        self._change_handlers.append(handler)

    def _notify_change(self, action: str, tool: McpTool):
        for handler in self._change_handlers:
            try:
                handler(action, tool)
            except Exception as e:
                logger.error(f"Change handler error: {e}")

    def to_openai_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.input_schema,
                },
            })
        return result

    def to_claude_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            result.append({
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.input_schema,
            })
        return result

    def to_gemini_tools(self) -> list[dict]:
        result = []
        for tool in self._tools.values():
            params = tool.input_schema.get("properties", {})
            required = tool.input_schema.get("required", [])

            declarations = []
            for param_name, param_schema in params.items():
                declarations.append({
                    "name": param_name,
                    "description": param_schema.get("description", ""),
                    "type": self._json_type_to_gemini(param_schema.get("type", "string")),
                    "required": param_name in required,
                })

            result.append({
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "OBJECT",
                    "properties": {d["name"]: {"type": d["type"]} for d in declarations},
                    "required": [d["name"] for d in declarations if d["required"]],
                },
            })
        return result

    @staticmethod
    def _json_type_to_gemini(json_type: str) -> str:
        mapping = {
            "string": "STRING",
            "integer": "INTEGER",
            "number": "NUMBER",
            "boolean": "BOOLEAN",
            "array": "ARRAY",
            "object": "OBJECT",
        }
        return mapping.get(json_type, "STRING")


class ToolDiscovery:
    def __init__(self, client):
        self._client = client
        self._registry = ToolRegistry()
        self._client.on_notification(
            "notifications/tools/list_changed",
            self._on_tools_changed,
        )

    @property
    def registry(self) -> ToolRegistry:
        return self._registry

    async def discover(self) -> list[McpTool]:
        result = await self._client.send_request("tools/list")
        tools = []
        for tool_data in result.get("tools", []):
            tool = McpTool.from_mcp(tool_data)
            self._registry.register(tool)
            tools.append(tool)
        logger.info(f"Discovered {len(tools)} tools")
        return tools

    async def _on_tools_changed(self, params: dict):
        logger.info("Tools list changed, re-discovering...")
        self._registry._tools.clear()
        await self.discover()

    async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
        tool = self._registry.get(name)
        if not tool:
            raise ValueError(f"Tool not found: {name}")

        result = await self._client.send_request("tools/call", {
            "name": name,
            "arguments": arguments,
        })

        contents = result.get("content", [])
        text_parts = []
        for content in contents:
            if content.get("type") == "text":
                text_parts.append(content["text"])
            elif content.get("type") == "image":
                text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
            elif content.get("type") == "resource":
                text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")

        return "\n".join(text_parts) if text_parts else str(result)

Dynamic Registration Usage

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    tools = await discovery.discover()

    print(f"Discovered {len(tools)} tools:")
    for tool in tools:
        print(f"  - {tool.name}: {tool.description}")

    openai_tools = discovery.registry.to_openai_tools()
    print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2)}")

    claude_tools = discovery.registry.to_claude_tools()
    print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2)}")

    result = await discovery.call_tool("search_docs", {"query": "MCP protocol parsing"})
    print(f"\nTool result: {result}")

    await client.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Pattern 4: Multi-Tool Sequential Orchestration

Problem: AI Agents need to call multiple tools in sequence, where each tool's input depends on the previous tool's output. How do you elegantly manage such sequential dependency chains?

Solution: Implement a sequential orchestrator with dependency injection, intermediate result passing, and conditional branching.

Sequential Orchestration Flow

User Input: "Search MCP docs and translate summary"
    │
    ▼
┌──────────┐    result     ┌──────────┐    result     ┌──────────┐
│ Tool 1   │ ────────────> │ Tool 2   │ ────────────> │ Tool 3   │
│ Search   │               │ Extract  │               │ Translate│
│ Docs     │               │ Summary  │               │ Content  │
└──────────┘               └──────────┘               └──────────┘
    │                          │                          │
    │ $search_result           │ $summary_text            │ $translated
    ▼                          ▼                          ▼
 Context Store            Context Store              Context Store

Complete Implementation

import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class StepResult:
    step_name: str
    tool_name: str
    input_args: dict[str, Any]
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


@dataclass
class PipelineStep:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    condition: Callable[[dict[str, Any]], bool] | None = None
    timeout: float = 30.0
    retry_count: int = 0

    def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
        if callable(self.arguments):
            return self.arguments(context)
        resolved = {}
        for key, value in self.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


def _deep_get(data: dict, path: str) -> Any:
    keys = path.split(".")
    current = data
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key)
        elif isinstance(current, list) and key.isdigit():
            idx = int(key)
            current = current[idx] if idx < len(current) else None
        else:
            return None
    return current


class SequentialOrchestrator:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery
        self._steps: list[PipelineStep] = []
        self._context: dict[str, Any] = {}

    def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
        self._steps.append(step)
        return self

    def set_context(self, key: str, value: Any):
        self._context[key] = value

    async def execute(self) -> list[StepResult]:
        results = []
        context = dict(self._context)

        for step in self._steps:
            if step.condition and not step.condition(context):
                logger.info(f"Skipping step '{step.name}': condition not met")
                continue

            resolved_args = step.resolve_arguments(context)
            logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= step.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(step.tool_name, resolved_args),
                        timeout=step.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000

                    context[f"steps.{step.name}"] = output
                    context["last_result"] = output

                    results.append(StepResult(
                        step_name=step.name,
                        tool_name=step.tool_name,
                        input_args=resolved_args,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    ))
                    break

                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= step.retry_count:
                        logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
                        await asyncio.sleep(1.0 * attempt)
            else:
                duration = (time.monotonic() - start) * 1000
                results.append(StepResult(
                    step_name=step.name,
                    tool_name=step.tool_name,
                    input_args=resolved_args,
                    output=None,
                    success=False,
                    error=last_error,
                    duration_ms=duration,
                ))
                break

        return results

Usage Example

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = SequentialOrchestrator(discovery)
    orchestrator.set_context("user_query", "MCP protocol parsing and Python client integration")

    orchestrator.add_step(PipelineStep(
        name="search",
        tool_name="search_docs",
        arguments={"query": "$user_query", "limit": 5},
    ))

    orchestrator.add_step(PipelineStep(
        name="extract",
        tool_name="extract_summary",
        arguments={"text": "$steps.search"},
        condition=lambda ctx: bool(ctx.get("steps.search")),
    ))

    orchestrator.add_step(PipelineStep(
        name="translate",
        tool_name="translate_text",
        arguments={"text": "$steps.extract", "target_lang": "en"},
        condition=lambda ctx: bool(ctx.get("steps.extract")),
    ))

    results = await orchestrator.execute()

    for r in results:
        status = "OK" if r.success else "FAIL"
        print(f"[{status}] [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
        if r.error:
            print(f"  Error: {r.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 5: Multi-Tool Parallel Orchestration

Problem: When multiple tool calls have no dependencies between them, sequential execution wastes time. How do you safely call multiple MCP tools in parallel and aggregate results?

Solution: Implement a parallel orchestrator based on asyncio with concurrency control, result aggregation, and DAG dependency execution.

Parallel Orchestration Architecture

                    ┌──────────┐
                    │  Request  │
                    └────┬─────┘
                         │
              ┌──────────┼──────────┐
              │          │          │
              ▼          ▼          ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Tool A   │ │ Tool B   │ │ Tool C   │
        │ Search   │ │ Query DB │ │ Call API │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │             │
             └──────────┬──┘────────────┘
                        │
                        ▼
                 ┌─────────────┐
                 │  Aggregate   │
                 └─────────────┘

Complete Implementation

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger(__name__)


@dataclass
class ParallelTask:
    name: str
    tool_name: str
    arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
    depends_on: list[str] = field(default_factory=list)
    timeout: float = 30.0
    retry_count: int = 0


@dataclass
class TaskResult:
    name: str
    tool_name: str
    output: Any
    success: bool = True
    error: str | None = None
    duration_ms: float = 0.0


class ParallelOrchestrator:
    def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
        self._discovery = discovery
        self._max_concurrency = max_concurrency
        self._tasks: list[ParallelTask] = []

    def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
        self._tasks.append(task)
        return self

    async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
        context = dict(initial_context or {})
        results: dict[str, TaskResult] = {}
        semaphore = asyncio.Semaphore(self._max_concurrency)
        completed = set()
        remaining = list(self._tasks)

        while remaining:
            ready = [
                t for t in remaining
                if all(dep in completed for dep in t.depends_on)
            ]

            if not ready and remaining:
                logger.error("Circular dependency detected")
                break

            coros = [self._execute_task(task, context, results, semaphore) for task in ready]
            task_results = await asyncio.gather(*coros, return_exceptions=True)

            for task, result in zip(ready, task_results):
                if isinstance(result, Exception):
                    results[task.name] = TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=None,
                        success=False,
                        error=str(result),
                    )
                else:
                    results[task.name] = result
                    context[f"tasks.{task.name}"] = result.output

                completed.add(task.name)
                remaining.remove(task)

        return results

    async def _execute_task(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
        semaphore: asyncio.Semaphore,
    ) -> TaskResult:
        async with semaphore:
            resolved_args = self._resolve_arguments(task, context, results)

            import time
            start = time.monotonic()

            attempt = 0
            last_error = None
            while attempt <= task.retry_count:
                try:
                    output = await asyncio.wait_for(
                        self._discovery.call_tool(task.tool_name, resolved_args),
                        timeout=task.timeout,
                    )
                    duration = (time.monotonic() - start) * 1000
                    return TaskResult(
                        name=task.name,
                        tool_name=task.tool_name,
                        output=output,
                        success=True,
                        duration_ms=duration,
                    )
                except Exception as e:
                    last_error = str(e)
                    attempt += 1
                    if attempt <= task.retry_count:
                        await asyncio.sleep(0.5 * attempt)

            duration = (time.monotonic() - start) * 1000
            return TaskResult(
                name=task.name,
                tool_name=task.tool_name,
                output=None,
                success=False,
                error=last_error,
                duration_ms=duration,
            )

    def _resolve_arguments(
        self,
        task: ParallelTask,
        context: dict[str, Any],
        results: dict[str, TaskResult],
    ) -> dict[str, Any]:
        if callable(task.arguments):
            return task.arguments(context)

        resolved = {}
        for key, value in task.arguments.items():
            if isinstance(value, str) and value.startswith("$"):
                ref_path = value[1:]
                if ref_path.startswith("tasks."):
                    task_name = ref_path.split(".")[1]
                    if task_name in results:
                        resolved[key] = results[task_name].output
                else:
                    resolved[key] = _deep_get(context, ref_path)
            else:
                resolved[key] = value
        return resolved


class DagOrchestrator(ParallelOrchestrator):
    def validate_dag(self) -> list[str]:
        errors = []
        task_names = {t.name for t in self._tasks}
        for task in self._tasks:
            for dep in task.depends_on:
                if dep not in task_names:
                    errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")

        visited = set()
        path = set()

        def has_cycle(name: str) -> bool:
            visited.add(name)
            path.add(name)
            task = next((t for t in self._tasks if t.name == name), None)
            if task:
                for dep in task.depends_on:
                    if dep in path:
                        return True
                    if dep not in visited and has_cycle(dep):
                        return True
            path.remove(name)
            return False

        for task in self._tasks:
            if task.name not in visited:
                if has_cycle(task.name):
                    errors.append("Circular dependency detected in DAG")
                    break

        return errors

Usage Example

async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)

    orchestrator.add_task(ParallelTask(
        name="search_web",
        tool_name="web_search",
        arguments={"query": "MCP protocol 2026"},
    ))

    orchestrator.add_task(ParallelTask(
        name="search_docs",
        tool_name="search_docs",
        arguments={"query": "Model Context Protocol Python"},
    ))

    orchestrator.add_task(ParallelTask(
        name="query_db",
        tool_name="query_database",
        arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
    ))

    orchestrator.add_task(ParallelTask(
        name="merge_results",
        tool_name="merge_and_deduplicate",
        arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
        depends_on=["search_web", "search_docs", "query_db"],
    ))

    results = await orchestrator.execute({"user_query": "MCP client integration"})

    for name, result in results.items():
        status = "OK" if result.success else "FAIL"
        print(f"[{status}] [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
        if result.error:
            print(f"  Error: {result.error}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 6: Error Handling and Retry

Problem: MCP clients need robust error handling and retry mechanisms when facing network instability, server overload, or tool execution exceptions.

Solution: Implement layered error handling with exponential backoff retry, circuit breakers, and fallback strategies.

Error Handling Architecture

┌────────────────────────────────────────────┐
│              Error Handling Layer           │
│                                            │
│  ┌──────────┐  ┌──────────┐  ┌─────────┐  │
│  │ Retry    │  │ Circuit  │  │ Fallback│  │
│  │ Strategy │  │ Breaker  │  │ Handler │  │
│  └──────────┘  └──────────┘  └─────────┘  │
│                                            │
│  ┌──────────────────────────────────────┐  │
│  │        Error Classification          │  │
│  │  Transient / Permanent / Unknown     │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘

Complete Implementation

import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ErrorCategory(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    UNKNOWN = "unknown"


@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_errors: list[int] = field(default_factory=lambda: [
        -32603,  # Internal error
        -32000,  # Server error
    ])


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig | None = None):
        self._config = config or CircuitBreakerConfig()
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = 0
        self._half_open_calls = 0

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
        return self._state

    def allow_request(self) -> bool:
        state = self.state
        if state == CircuitState.CLOSED:
            return True
        if state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self._config.half_open_max_calls:
                self._half_open_calls += 1
                return True
            return False
        return False

    def record_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self._config.half_open_max_calls:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
                self._success_count = 0
        else:
            self._failure_count = 0

    def record_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.OPEN
            self._success_count = 0
        elif self._failure_count >= self._config.failure_threshold:
            self._state = CircuitState.OPEN


class ResilientMcpClient:
    def __init__(
        self,
        client,
        retry_config: RetryConfig | None = None,
        circuit_config: CircuitBreakerConfig | None = None,
    ):
        self._client = client
        self._retry_config = retry_config or RetryConfig()
        self._circuit_breakers: dict[str, CircuitBreaker] = {}
        self._circuit_config = circuit_config or CircuitBreakerConfig()
        self._fallback_handlers: dict[str, Callable] = {}

    def _get_breaker(self, server_name: str) -> CircuitBreaker:
        if server_name not in self._circuit_breakers:
            self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
        return self._circuit_breakers[server_name]

    def register_fallback(self, tool_name: str, handler: Callable):
        self._fallback_handlers[tool_name] = handler

    def classify_error(self, error: Exception) -> ErrorCategory:
        if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
            return ErrorCategory.TRANSIENT
        if isinstance(error, McpError):
            if error.code in self._retry_config.retryable_errors:
                return ErrorCategory.TRANSIENT
            if error.code in [-32600, -32601, -32602]:
                return ErrorCategory.PERMANENT
        return ErrorCategory.UNKNOWN

    async def call_with_retry(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        server_name: str = "default",
    ) -> Any:
        breaker = self._get_breaker(server_name)

        if not breaker.allow_request():
            fallback = self._fallback_handlers.get(tool_name)
            if fallback:
                logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
                return await fallback(arguments)
            raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")

        last_error = None
        for attempt in range(1, self._retry_config.max_attempts + 1):
            try:
                result = await self._client.call_tool(tool_name, arguments)
                breaker.record_success()
                return result
            except Exception as e:
                last_error = e
                category = self.classify_error(e)

                if category == ErrorCategory.PERMANENT:
                    breaker.record_failure()
                    raise

                if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
                    delay = min(
                        self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
                        self._retry_config.max_delay,
                    )
                    if self._retry_config.jitter:
                        import random
                        delay *= (0.5 + random.random() * 0.5)

                    logger.warning(
                        f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
                        f"retrying in {delay:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay)
                else:
                    breaker.record_failure()

        if fallback := self._fallback_handlers.get(tool_name):
            logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
            return await fallback(arguments)

        raise last_error


class CircuitOpenError(Exception):
    pass

Usage Example

async def fallback_search(arguments: dict) -> str:
    return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})


async def main():
    client = JsonRpcClient(timeout=30.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    resilient = ResilientMcpClient(
        discovery,
        retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
        circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
    )
    resilient.register_fallback("search_docs", fallback_search)

    try:
        result = await resilient.call_with_retry("search_docs", {"query": "MCP protocol parsing"})
        print(f"Result: {result}")
    except CircuitOpenError as e:
        print(f"Service unavailable: {e}")
    except Exception as e:
        print(f"Failed: {e}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 7: Streaming Response Processing

Problem: Long-running tool calls need real-time progress feedback. The MCP protocol supports streaming responses via SSE and Notifications.

Solution: Implement a streaming response handler with progress notifications, partial result aggregation, and cancellation support.

Streaming Response Architecture

┌──────────┐  tools/call   ┌──────────┐
│  Client  │ ────────────> │  Server  │
│          │               │          │
│  Stream  │ <── notify ── │  Tool    │
│  Handler │   progress    │  Running │
│          │ <── notify ── │          │
│          │   progress    │          │
│          │ <── result ── │          │
│          │   complete    │          │
└──────────┘               └──────────┘

Complete Implementation

import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum

logger = logging.getLogger(__name__)


class StreamEventType(Enum):
    PROGRESS = "progress"
    PARTIAL_RESULT = "partial_result"
    COMPLETE = "complete"
    ERROR = "error"
    LOG = "log"


@dataclass
class StreamEvent:
    type: StreamEventType
    data: Any
    progress: float | None = None
    message: str | None = None
    timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())


@dataclass
class StreamResult:
    events: list[StreamEvent] = field(default_factory=list)
    final_result: Any = None
    success: bool = True
    error: str | None = None
    total_duration_ms: float = 0.0


class StreamHandler:
    def __init__(self):
        self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
        self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
        self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
        self._error_handlers: list[Callable[[StreamEvent], Any]] = []
        self._log_handlers: list[Callable[[StreamEvent], Any]] = []

    def on_progress(self, handler: Callable[[StreamEvent], Any]):
        self._progress_handlers.append(handler)

    def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
        self._partial_handlers.append(handler)

    def on_complete(self, handler: Callable[[StreamEvent], Any]):
        self._complete_handlers.append(handler)

    def on_error(self, handler: Callable[[StreamEvent], Any]):
        self._error_handlers.append(handler)

    def on_log(self, handler: Callable[[StreamEvent], Any]):
        self._log_handlers.append(handler)

    async def handle_notification(self, params: dict[str, Any]):
        method = params.get("method", "")

        if method == "notifications/progress":
            progress_data = params.get("progress", {})
            event = StreamEvent(
                type=StreamEventType.PROGRESS,
                data=progress_data,
                progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
                message=progress_data.get("message"),
            )
            for handler in self._progress_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Progress handler error: {e}")

        elif method == "notifications/message":
            level = params.get("level", "info")
            event = StreamEvent(
                type=StreamEventType.LOG,
                data=params.get("data"),
                message=f"[{level}] {params.get('data', {}).get('text', '')}",
            )
            for handler in self._log_handlers:
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Log handler error: {e}")


class StreamingMcpClient:
    def __init__(self, client, discovery: ToolDiscovery):
        self._client = client
        self._discovery = discovery
        self._stream_handler = StreamHandler()
        self._active_calls: dict[str, asyncio.Task] = {}

        client.on_notification(
            "notifications/progress",
            self._stream_handler.handle_notification,
        )
        client.on_notification(
            "notifications/message",
            self._stream_handler.handle_notification,
        )

    @property
    def stream(self) -> StreamHandler:
        return self._stream_handler

    async def call_with_stream(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        timeout: float = 120.0,
    ) -> StreamResult:
        import time
        start = time.monotonic()
        result = StreamResult()

        self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
        self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))

        try:
            call_result = await asyncio.wait_for(
                self._discovery.call_tool(tool_name, arguments),
                timeout=timeout,
            )
            result.final_result = call_result
            result.success = True
            result.events.append(StreamEvent(
                type=StreamEventType.COMPLETE,
                data=call_result,
            ))
        except asyncio.TimeoutError:
            result.success = False
            result.error = f"Tool call timed out after {timeout}s"
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=result.error,
            ))
        except Exception as e:
            result.success = False
            result.error = str(e)
            result.events.append(StreamEvent(
                type=StreamEventType.ERROR,
                data=None,
                message=str(e),
            ))

        result.total_duration_ms = (time.monotonic() - start) * 1000
        return result

    async def cancel(self, request_id: int | str):
        await self._client.send_notification("notifications/cancelled", {
            "requestId": request_id,
            "reason": "User cancelled",
        })

Usage Example

async def main():
    client = JsonRpcClient(timeout=120.0)
    await client.connect_stdio("python", ["mcp_server.py"])
    await initialize_client(client)

    discovery = ToolDiscovery(client)
    await discovery.discover()

    streaming = StreamingMcpClient(client, discovery)

    streaming.stream.on_progress(lambda e: print(f"  Progress: {e.progress:.0%} - {e.message or ''}"))
    streaming.stream.on_log(lambda e: print(f"  Log: {e.message}"))
    streaming.stream.on_complete(lambda e: print(f"  Complete!"))
    streaming.stream.on_error(lambda e: print(f"  Error: {e.message}"))

    print("Calling long-running tool with streaming...")
    result = await streaming.call_with_stream(
        "batch_process",
        {"items": list(range(100)), "operation": "analyze"},
        timeout=120.0,
    )

    print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
    if result.final_result:
        print(f"Output: {str(result.final_result)[:200]}...")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

5 Common Pitfalls and Solutions

1. Missing initialized Notification After Handshake

Pitfall: After sending the initialize request, you forget to send the notifications/initialized notification. The Server keeps waiting for the handshake to complete, and all subsequent requests time out.

Solution:

async def safe_initialize(client: JsonRpcClient) -> dict:
    result = await client.send_request("initialize", {
        "protocolVersion": "2025-03-26",
        "capabilities": {},
        "clientInfo": {"name": "my-client", "version": "1.0.0"},
    })
    # Must send initialized notification
    await client.send_notification("notifications/initialized")
    return result

2. Sending Requests Before SSE Endpoint is Ready

Pitfall: After establishing the SSE connection, you immediately send JSON-RPC requests before receiving the endpoint event, causing requests to hit the wrong URL.

Solution:

class SafeSseClient:
    def __init__(self, server_url: str):
        self._transport = SseTransport(server_url)
        self._endpoint_ready = asyncio.Event()
        self._transport.on_event("endpoint", self._on_endpoint)

    async def _on_endpoint(self, data):
        self._endpoint_ready.set()

    async def send_request(self, method: str, params: dict | None = None) -> Any:
        await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
        # Now safe to send requests
        ...

3. Missing required Field in Tool Schema Conversion

Pitfall: When converting MCP tool schemas to OpenAI function calling format, inputSchema is used directly as parameters, but some LLMs require the required field to be present.

Solution:

def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
    schema = dict(mcp_tool.input_schema)
    if "required" not in schema:
        schema["required"] = []
    if "type" not in schema:
        schema["type"] = "object"
    return {
        "type": "function",
        "function": {
            "name": mcp_tool.name,
            "description": mcp_tool.description,
            "parameters": schema,
        },
    }

4. Shared State Race Conditions in Parallel Calls

Pitfall: Multiple parallel tool calls share the same httpx.AsyncClient or asyncio.Future dictionary, causing response mismatches.

Solution: Use isolated request ID spaces or connection pool isolation for each parallel call:

class IsolatedClientPool:
    def __init__(self, base_url: str, pool_size: int = 5):
        self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
        self._index = 0
        self._lock = asyncio.Lock()

    async def get_client(self) -> McpSseClient:
        async with self._lock:
            client = self._clients[self._index % len(self._clients)]
            self._index += 1
            return client

5. Confusing Progress Notifications with Final Results

Pitfall: In the SSE channel, progress notifications and final results share the same stream. The client treats progress notifications as final results.

Solution: Strictly distinguish message types. Only process responses with an id field as request results:

async def _handle_sse_message(self, message: dict):
    if "id" in message:
        # This is a request response, match pending future
        future = self._pending.pop(message["id"], None)
        if future and not future.done():
            future.set_result(message.get("result"))
    elif "method" in message:
        # This is a notification, forward to stream handler
        await self._stream_handler.handle_notification(message)

10 Common Error Troubleshooting

Error Message Cause Solution
MCP Error [-32600]: Invalid Request Malformed JSON-RPC message, missing required fields Check request has jsonrpc, method, id fields; use /en/json/format to validate
MCP Error [-32601]: Method not found Called a method the Server doesn't support Call initialize first to confirm Server capabilities; check method spelling
MCP Error [-32602]: Invalid params Tool parameter type or structure mismatch Use tools/list to get correct inputSchema; verify parameters against it
Connection refused MCP Server not running or wrong port Confirm Server process is running; check port and URL config
SSE connection timeout Network unreachable or Server not responding to SSE handshake Check firewall rules; confirm SSE endpoint is reachable
Request timed out after 30s Tool execution exceeds client timeout setting Increase timeout parameter or use streaming responses for progress
Circuit breaker open Consecutive failures exceed circuit breaker threshold Check Server health; wait for recovery timeout before retrying
Tool not found: xxx Called an unregistered tool name Re-run tools/list discovery; check tool name case sensitivity
Endpoint not received SSE connection established but no endpoint event received Confirm Server sends endpoint event correctly; check SSE route config
JSON decode error in SSE stream SSE data is not valid JSON Check Server-side SSE event format; ensure data field contains valid JSON

Advanced Optimization Tips

1. Connection Pool and Multi-Server Management

In production, AI Agents typically need to connect to multiple MCP Servers simultaneously. Use a connection pool to manage multiple client instances:

class McpConnectionPool:
    def __init__(self):
        self._connections: dict[str, Any] = {}
        self._discovery_map: dict[str, ToolDiscovery] = {}

    async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
        client = JsonRpcClient(timeout=30.0)
        await client.connect_stdio(command, args)
        await initialize_client(client)
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    async def connect_sse(self, name: str, url: str):
        client = McpSseClient(url)
        await client.connect()
        await client.initialize()
        discovery = ToolDiscovery(client)
        await discovery.discover()
        self._connections[name] = client
        self._discovery_map[name] = discovery

    def get_discovery(self, name: str) -> ToolDiscovery | None:
        return self._discovery_map.get(name)

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        for name, discovery in self._discovery_map.items():
            if discovery.registry.get(tool_name):
                return await discovery.call_tool(tool_name, arguments)
        raise ValueError(f"Tool '{tool_name}' not found in any connected server")

    async def close_all(self):
        for client in self._connections.values():
            await client.close()

2. Tool Call Caching

Cache idempotent tool call results to reduce duplicate requests:

import hashlib
import json
from typing import Any


class ToolCallCache:
    def __init__(self, ttl: float = 300.0, max_size: int = 1000):
        self._cache: dict[str, tuple[float, Any]] = {}
        self._ttl = ttl
        self._max_size = max_size

    def _make_key(self, tool_name: str, arguments: dict) -> str:
        raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()

    def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
        key = self._make_key(tool_name, arguments)
        if key in self._cache:
            ts, value = self._cache[key]
            if asyncio.get_event_loop().time() - ts < self._ttl:
                return True, value
            del self._cache[key]
        return False, None

    def set(self, tool_name: str, arguments: dict, value: Any):
        if len(self._cache) >= self._max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]
        key = self._make_key(tool_name, arguments)
        self._cache[key] = (asyncio.get_event_loop().time(), value)

    def invalidate(self, tool_name: str, arguments: dict | None = None):
        if arguments:
            key = self._make_key(tool_name, arguments)
            self._cache.pop(key, None)
        else:
            prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
            keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
            for k in keys_to_remove:
                del self._cache[k]

3. Observability and Tracing

Add OpenTelemetry tracing to MCP client calls:

from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")

tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")


class ObservableMcpClient:
    def __init__(self, discovery: ToolDiscovery):
        self._discovery = discovery

    async def call_tool(self, tool_name: str, arguments: dict) -> Any:
        with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
            span.set_attribute("mcp.tool.name", tool_name)
            span.set_attribute("mcp.tool.args_count", len(arguments))

            import time
            start = time.monotonic()
            try:
                result = await self._discovery.call_tool(tool_name, arguments)
                duration = time.monotonic() - start

                span.set_status(Status(StatusCode.OK))
                tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
                tool_call_duration.record(duration, {"tool": tool_name})

                return result
            except Exception as e:
                duration = time.monotonic() - start
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
                tool_call_duration.record(duration, {"tool": tool_name})
                raise

Comparison: 3 Orchestration Approaches

Dimension Sequential Parallel DAG
Execution One after another All tasks simultaneously Topologically sorted by dependencies
Use Case Strict data dependencies between tools Completely independent tools Partial dependencies, partial independence
Latency Sum of all tool durations Duration of slowest tool Sum of critical path durations
Error Propagation Previous failure skips all subsequent Single failure doesn't affect others Downstream of failed node is skipped
Complexity Low Medium High
Resource Usage Low (one connection at a time) High (concurrent connection pool) Medium (layered concurrency)
Result Consistency Strong (deterministic order) Weak (non-deterministic completion order) Medium (non-deterministic within layers)
Timeout Control Per-step independent timeout Global + per-task timeout Per-step + global timeout
Typical Scenario Search -> Extract -> Translate Search 3 data sources simultaneously Search A+B -> Merge -> Translate

Selection Guide:

  • Strict causal dependencies between tools → Sequential orchestration
  • Completely independent tools → Parallel orchestration
  • Partial dependencies between tools → DAG orchestration

For more on AI Agent workflow orchestration, see AI Agent Workflow DAG Orchestration and AI Agent Protocols Comparison.


  • JSON Formatter: When debugging MCP protocol messages, use /en/json/format to format JSON-RPC requests and responses
  • cURL to Code: When testing MCP Server endpoints, use /en/dev/curl-to-code to quickly generate Python client code
  • Base64 Encode/Decode: When handling binary data in MCP transport, use /en/encode/base64 for encoding and decoding

Summary: Python MCP client integration goes far beyond sending JSON-RPC requests. From basic stdio connections to SSE remote transport, from tool discovery and dynamic registration to multi-tool sequential/parallel/DAG orchestration, from error handling with circuit breakers and fallbacks to streaming responses with progress tracking—every aspect requires production-grade engineering. Master these 7 patterns, and your AI Agent will have reliable, efficient, and observable tool calling capabilities. Remember: MCP protocol parsing is the foundation, multi-tool orchestration is the core, error handling is the safeguard, and streaming responses are the experience. Refer to the official Model Context Protocol Specification for the latest protocol definitions.

Try these browser-local tools — no sign-up required →

#Python#MCP#Model Context Protocol#AI工具编排#函数调用#多工具集成#2026#AI与大数据