Python MCP Client Integration: 7 Production Patterns from Protocol Parsing to Multi-Tool Orchestration
Why Your AI App Can Never Connect to External Tools: 5 Pain Points of MCP Client Integration
You spent three days building a perfect MCP Server, but Claude Desktop won't connect? Your AI Agent calls three tools, and when the first one times out, the rest all fail? Tool discovery returns a bunch of schemas, but you don't know how to convert them to LLM-compatible function calling format? SSE connections keep dropping and reconnecting, losing data without you knowing? Multi-tool orchestration is either too slow sequentially or too chaotic in parallel?
The root cause: most developers focus only on MCP Server development, ignoring the complexity of Client-side integration. While the MCP protocol defines standard communication, the client must handle JSON-RPC message construction, transport adaptation, tool discovery and conversion, orchestration strategy, error recovery, and a whole suite of engineering challenges.
Key Takeaways:
- Master MCP protocol JSON-RPC 2.0 communication for reliable client-server connections
- Learn tool discovery and dynamic registration for AI Agent auto-adaptation
- Understand 7 production patterns for multi-tool orchestration, from sequential calls to parallel DAG execution
- Build robust error handling and retry mechanisms for network jitter and service unavailability
- Master SSE streaming response processing for real-time tool call progress feedback
Table of Contents
- MCP Protocol Core Concepts
- Pattern 1: Basic JSON-RPC Client
- Pattern 2: SSE Transport Implementation
- Pattern 3: Tool Discovery and Dynamic Registration
- Pattern 4: Multi-Tool Sequential Orchestration
- Pattern 5: Multi-Tool Parallel Orchestration
- Pattern 6: Error Handling and Retry
- Pattern 7: Streaming Response Processing
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Tips
- Comparison: 3 Orchestration Approaches
MCP Protocol Core Concepts
Before diving into the 7 production patterns, we need to understand where the MCP client sits in the protocol stack and its core concepts.
MCP Client Architecture Overview
┌─────────────────────────────────────────────────┐
│ AI Application │
│ (Claude Desktop / ChatGPT / Custom Agent) │
├─────────────────────────────────────────────────┤
│ MCP Client Layer │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ JSON-RPC │ │Transport │ │ Tool Registry │ │
│ │ Handler │ │ Adapter │ │ & Discovery │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
│ ┌───────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ Orchest- │ │ Error & │ │ Stream │ │
│ │ ration │ │ Retry │ │ Handler │ │
│ └───────────┘ └──────────┘ └────────────────┘ │
├─────────────────────────────────────────────────┤
│ Transport Layer │
│ stdio / SSE / Streamable HTTP │
├─────────────────────────────────────────────────┤
│ MCP Server │
│ Tools / Resources / Prompts │
└─────────────────────────────────────────────────┘
Core Concepts Reference
| Concept | Description | JSON-RPC Method | Analogy |
|---|---|---|---|
| Initialize | Client-server handshake | initialize |
TCP 3-way handshake |
| Tool Discovery | Get list of tools server supports | tools/list |
DNS lookup |
| Tool Call | Call a specific tool and get results | tools/call |
HTTP POST |
| Resource Read | Read resources exposed by server | resources/read |
HTTP GET |
| Prompt Get | Get predefined prompt templates | prompts/get |
Template engine |
| Notification | One-way notification, no response needed | No id field | UDP broadcast |
| Cancel | Cancel an in-progress request | notifications/cancelled |
HTTP cancellation |
JSON-RPC 2.0 Message Format
MCP is based on JSON-RPC 2.0. Understanding message format is foundational for client development:
// Request message
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "search_docs",
"arguments": {"query": "MCP protocol"}
},
"id": 1
}
// Success response
{
"jsonrpc": "2.0",
"result": {
"content": [
{"type": "text", "text": "Search results..."}
]
},
"id": 1
}
// Error response
{
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request"
},
"id": 1
}
If you're interested in MCP Server development, check out our previous article: Python MCP Server Development Guide. For deeper understanding of AI function calling protocols, see AI Function Calling in Production.
Pattern 1: Basic JSON-RPC Client
Problem: Your AI application needs to connect to an MCP Server, but you don't know how to construct proper JSON-RPC messages, manage request IDs, or handle responses.
Solution: Build a complete JSON-RPC 2.0 client that encapsulates message construction, sending, response matching, and timeout handling.
Complete Implementation
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass
class JsonRpcRequest:
jsonrpc: str = "2.0"
method: str = ""
params: dict[str, Any] = field(default_factory=dict)
id: int | str = field(default_factory=lambda: uuid.uuid4().int % 2**31)
@dataclass
class JsonRpcResponse:
id: int | str
result: Any = None
error: dict[str, Any] | None = None
class JsonRpcClient:
def __init__(self, timeout: float = 30.0):
self._request_id = 0
self._pending: dict[int | str, asyncio.Future] = {}
self._timeout = timeout
self._notification_handlers: dict[str, Callable] = {}
self._writer: asyncio.StreamWriter | None = None
self._reader: asyncio.StreamReader | None = None
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def connect_stdio(self, command: str, args: list[str] | None = None, env: dict[str, str] | None = None):
import os
proc_env = {**os.environ, **(env or {})}
self._process = await asyncio.create_subprocess_exec(
command,
*(args or []),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=proc_env,
)
self._reader = self._process.stdout
self._writer = self._process.stdin
asyncio.create_task(self._read_loop())
async def _read_loop(self):
while self._reader and not self._reader.at_eof():
line = await self._reader.readline()
if not line:
continue
try:
message = json.loads(line.decode().strip())
await self._handle_message(message)
except json.JSONDecodeError:
continue
async def _handle_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(
McpError(message["error"]["code"], message["error"]["message"])
)
else:
future.set_result(JsonRpcResponse(id=message["id"], result=message.get("result")))
elif "method" in message:
handler = self._notification_handlers.get(message["method"])
if handler:
await handler(message.get("params", {}))
async def send_request(self, method: str, params: dict[str, Any] | None = None) -> Any:
request_id = self._next_id()
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[request_id] = future
data = json.dumps(request) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
try:
response = await asyncio.wait_for(future, timeout=self._timeout)
return response.result
except asyncio.TimeoutError:
self._pending.pop(request_id, None)
raise McpTimeoutError(f"Request {method} timed out after {self._timeout}s")
async def send_notification(self, method: str, params: dict[str, Any] | None = None):
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
data = json.dumps(notification) + "\n"
if self._writer:
self._writer.write(data.encode())
await self._writer.drain()
def on_notification(self, method: str, handler: Callable):
self._notification_handlers[method] = handler
async def close(self):
if self._writer:
self._writer.close()
if hasattr(self, "_process") and self._process:
self._process.terminate()
await self._process.wait()
class McpError(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"MCP Error [{code}]: {message}")
class McpTimeoutError(Exception):
pass
Initialization Handshake
async def initialize_client(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {},
},
"clientInfo": {
"name": "toolsku-mcp-client",
"version": "1.0.0",
},
})
await client.send_notification("notifications/initialized")
return result
Usage Example
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
server_info = await initialize_client(client)
print(f"Connected to: {server_info['serverInfo']['name']}")
tools = await client.send_request("tools/list")
for tool in tools.get("tools", []):
print(f" - {tool['name']}: {tool['description']}")
result = await client.send_request("tools/call", {
"name": "search_docs",
"arguments": {"query": "MCP protocol parsing"},
})
print(f"Result: {result}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 2: SSE Transport Implementation
Problem: stdio transport only works for local process communication. Production environments need network-based connections to remote MCP Servers. SSE (Server-Sent Events) is the remote transport method supported by MCP.
Solution: Implement a complete SSE transport layer including connection management, event parsing, message routing, and auto-reconnection.
SSE Transport Architecture
┌──────────────┐ POST /messages ┌──────────────┐
│ MCP Client │ ──────────────────────> │ MCP Server │
│ │ │ │
│ SSE Handler │ <──── GET /sse ──────── │ SSE Endpoint│
│ │ EventStream │ │
└──────────────┘ └──────────────┘
│ │
│ 1. GET /sse to establish SSE connection │
│ 2. Receive endpoint event for POST URL │
│ 3. POST /messages to send JSON-RPC reqs │
│ 4. Receive JSON-RPC responses via SSE │
└──────────────────────────────────────────┘
Complete Implementation
import asyncio
import json
import logging
from typing import Any, Callable
from dataclasses import dataclass, field
import httpx
logger = logging.getLogger(__name__)
@dataclass
class SseEvent:
event: str = "message"
data: str = ""
id: str | None = None
retry: int | None = None
class SseTransport:
def __init__(
self,
server_url: str,
reconnect_interval: float = 5.0,
max_reconnect_attempts: int = 10,
headers: dict[str, str] | None = None,
):
self._server_url = server_url.rstrip("/")
self._reconnect_interval = reconnect_interval
self._max_reconnect_attempts = max_reconnect_attempts
self._headers = headers or {}
self._message_endpoint: str | None = None
self._session: httpx.AsyncClient | None = None
self._running = False
self._reconnect_count = 0
self._event_handlers: dict[str, list[Callable]] = {"message": [], "endpoint": []}
self._last_event_id: str | None = None
async def connect(self):
self._session = httpx.AsyncClient(
headers={**self._headers, "Accept": "text/event-stream"},
timeout=httpx.Timeout(30.0, read=300.0),
)
self._running = True
asyncio.create_task(self._sse_listener())
async def _sse_listener(self):
while self._running:
try:
sse_url = f"{self._server_url}/sse"
params = {}
if self._last_event_id:
params["Last-Event-ID"] = self._last_event_id
async with self._session.stream("GET", sse_url, params=params) as response:
response.raise_for_status()
self._reconnect_count = 0
current_event = SseEvent()
async for line in response.aiter_lines():
if not self._running:
break
if line.startswith(":"):
continue
if line == "":
if current_event.data:
await self._dispatch_event(current_event)
current_event = SseEvent()
continue
if ":" in line:
field_name, _, field_value = line.partition(":")
field_value = field_value.lstrip(" ")
if field_name == "event":
current_event.event = field_value
elif field_name == "data":
if current_event.data:
current_event.data += "\n"
current_event.data += field_value
elif field_name == "id":
current_event.id = field_value
self._last_event_id = field_value
elif field_name == "retry":
try:
current_event.retry = int(field_value)
except ValueError:
pass
except httpx.HTTPStatusError as e:
logger.error(f"SSE HTTP error: {e.response.status_code}")
except httpx.RequestError as e:
logger.error(f"SSE connection error: {e}")
except Exception as e:
logger.error(f"SSE unexpected error: {e}")
if self._running:
self._reconnect_count += 1
if self._reconnect_count > self._max_reconnect_attempts:
logger.error("Max reconnect attempts reached, stopping")
self._running = False
break
wait = min(self._reconnect_interval * (2 ** (self._reconnect_count - 1)), 60.0)
logger.info(f"Reconnecting in {wait}s (attempt {self._reconnect_count})")
await asyncio.sleep(wait)
async def _dispatch_event(self, event: SseEvent):
if event.event == "endpoint":
self._message_endpoint = f"{self._server_url}{event.data}"
logger.info(f"Message endpoint: {self._message_endpoint}")
handlers = self._event_handlers.get(event.event, [])
for handler in handlers:
try:
if event.event == "endpoint":
await handler(event.data)
else:
message = json.loads(event.data)
await handler(message)
except Exception as e:
logger.error(f"Event handler error: {e}")
async def send_message(self, message: dict[str, Any]) -> None:
if not self._message_endpoint:
raise RuntimeError("SSE transport not initialized, endpoint not received")
if not self._session:
raise RuntimeError("SSE session not connected")
response = await self._session.post(
self._message_endpoint,
json=message,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
def on_event(self, event_type: str, handler: Callable):
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
async def close(self):
self._running = False
if self._session:
await self._session.aclose()
Integration Usage
class McpSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._request_id = 0
self._pending: dict[int, asyncio.Future] = {}
self._transport.on_event("message", self._on_message)
async def connect(self):
await self._transport.connect()
async def _on_message(self, message: dict):
if "id" in message:
future = self._pending.pop(message["id"], None)
if future and not future.done():
if "error" in message:
future.set_exception(McpError(message["error"]["code"], message["error"]["message"]))
else:
future.set_result(message.get("result"))
async def send_request(self, method: str, params: dict | None = None) -> Any:
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self._request_id,
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[self._request_id] = future
await self._transport.send_message(request)
try:
return await asyncio.wait_for(future, timeout=60.0)
except asyncio.TimeoutError:
self._pending.pop(self._request_id, None)
raise McpTimeoutError(f"Request {method} timed out")
async def initialize(self) -> dict:
result = await self.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "toolsku-sse-client", "version": "1.0.0"},
})
await self._transport.send_message({
"jsonrpc": "2.0",
"method": "notifications/initialized",
})
return result
async def close(self):
await self._transport.close()
async def main():
client = McpSseClient("http://localhost:8080")
await client.connect()
info = await client.initialize()
print(f"Connected: {info}")
tools = await client.send_request("tools/list")
print(f"Available tools: {tools}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 3: Tool Discovery and Dynamic Registration
Problem: MCP Server tool lists are dynamic. Clients need to auto-discover tools and convert them to LLM-compatible function calling formats, while supporting tool change notifications.
Solution: Implement tool discovery, schema conversion, dynamic registration, and change listening.
Tool Discovery Architecture
┌──────────────┐ tools/list ┌──────────────┐
│ MCP Client │ ────────────> │ MCP Server │
│ │ <──────────── │ │
│ Tool │ Tool Schema │ Tool │
│ Registry │ │ Definitions │
│ │ tools/ │ │
│ ┌────────┐ │ list_changed │ │
│ │ OpenAI │ │ <──────────── │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Claude │ │ │ │
│ │ Format │ │ │ │
│ ├────────┤ │ │ │
│ │ Gemini │ │ │ │
│ │ Format │ │ │ │
│ └────────┘ │ │ │
└──────────────┘ └──────────────┘
Complete Implementation
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class McpTool:
name: str
description: str
input_schema: dict[str, Any]
annotations: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mcp(cls, data: dict) -> "McpTool":
return cls(
name=data["name"],
description=data.get("description", ""),
input_schema=data.get("inputSchema", {}),
annotations=data.get("annotations", {}),
)
class ToolRegistry:
def __init__(self):
self._tools: dict[str, McpTool] = {}
self._change_handlers: list[Callable] = []
def register(self, tool: McpTool):
self._tools[tool.name] = tool
logger.info(f"Registered tool: {tool.name}")
self._notify_change("registered", tool)
def unregister(self, name: str):
if name in self._tools:
tool = self._tools.pop(name)
logger.info(f"Unregistered tool: {name}")
self._notify_change("unregistered", tool)
def get(self, name: str) -> McpTool | None:
return self._tools.get(name)
def list_tools(self) -> list[McpTool]:
return list(self._tools.values())
def on_change(self, handler: Callable):
self._change_handlers.append(handler)
def _notify_change(self, action: str, tool: McpTool):
for handler in self._change_handlers:
try:
handler(action, tool)
except Exception as e:
logger.error(f"Change handler error: {e}")
def to_openai_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
},
})
return result
def to_claude_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
result.append({
"name": tool.name,
"description": tool.description,
"input_schema": tool.input_schema,
})
return result
def to_gemini_tools(self) -> list[dict]:
result = []
for tool in self._tools.values():
params = tool.input_schema.get("properties", {})
required = tool.input_schema.get("required", [])
declarations = []
for param_name, param_schema in params.items():
declarations.append({
"name": param_name,
"description": param_schema.get("description", ""),
"type": self._json_type_to_gemini(param_schema.get("type", "string")),
"required": param_name in required,
})
result.append({
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "OBJECT",
"properties": {d["name"]: {"type": d["type"]} for d in declarations},
"required": [d["name"] for d in declarations if d["required"]],
},
})
return result
@staticmethod
def _json_type_to_gemini(json_type: str) -> str:
mapping = {
"string": "STRING",
"integer": "INTEGER",
"number": "NUMBER",
"boolean": "BOOLEAN",
"array": "ARRAY",
"object": "OBJECT",
}
return mapping.get(json_type, "STRING")
class ToolDiscovery:
def __init__(self, client):
self._client = client
self._registry = ToolRegistry()
self._client.on_notification(
"notifications/tools/list_changed",
self._on_tools_changed,
)
@property
def registry(self) -> ToolRegistry:
return self._registry
async def discover(self) -> list[McpTool]:
result = await self._client.send_request("tools/list")
tools = []
for tool_data in result.get("tools", []):
tool = McpTool.from_mcp(tool_data)
self._registry.register(tool)
tools.append(tool)
logger.info(f"Discovered {len(tools)} tools")
return tools
async def _on_tools_changed(self, params: dict):
logger.info("Tools list changed, re-discovering...")
self._registry._tools.clear()
await self.discover()
async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
tool = self._registry.get(name)
if not tool:
raise ValueError(f"Tool not found: {name}")
result = await self._client.send_request("tools/call", {
"name": name,
"arguments": arguments,
})
contents = result.get("content", [])
text_parts = []
for content in contents:
if content.get("type") == "text":
text_parts.append(content["text"])
elif content.get("type") == "image":
text_parts.append(f"[Image: {content.get('mimeType', 'unknown')}]")
elif content.get("type") == "resource":
text_parts.append(f"[Resource: {content.get('resource', {}).get('uri', '')}]")
return "\n".join(text_parts) if text_parts else str(result)
Dynamic Registration Usage
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
tools = await discovery.discover()
print(f"Discovered {len(tools)} tools:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
openai_tools = discovery.registry.to_openai_tools()
print(f"\nOpenAI format: {json.dumps(openai_tools[:1], indent=2)}")
claude_tools = discovery.registry.to_claude_tools()
print(f"\nClaude format: {json.dumps(claude_tools[:1], indent=2)}")
result = await discovery.call_tool("search_docs", {"query": "MCP protocol parsing"})
print(f"\nTool result: {result}")
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Pattern 4: Multi-Tool Sequential Orchestration
Problem: AI Agents need to call multiple tools in sequence, where each tool's input depends on the previous tool's output. How do you elegantly manage such sequential dependency chains?
Solution: Implement a sequential orchestrator with dependency injection, intermediate result passing, and conditional branching.
Sequential Orchestration Flow
User Input: "Search MCP docs and translate summary"
│
▼
┌──────────┐ result ┌──────────┐ result ┌──────────┐
│ Tool 1 │ ────────────> │ Tool 2 │ ────────────> │ Tool 3 │
│ Search │ │ Extract │ │ Translate│
│ Docs │ │ Summary │ │ Content │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ $search_result │ $summary_text │ $translated
▼ ▼ ▼
Context Store Context Store Context Store
Complete Implementation
import asyncio
import json
import re
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class StepResult:
step_name: str
tool_name: str
input_args: dict[str, Any]
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
@dataclass
class PipelineStep:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
condition: Callable[[dict[str, Any]], bool] | None = None
timeout: float = 30.0
retry_count: int = 0
def resolve_arguments(self, context: dict[str, Any]) -> dict[str, Any]:
if callable(self.arguments):
return self.arguments(context)
resolved = {}
for key, value in self.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
def _deep_get(data: dict, path: str) -> Any:
keys = path.split(".")
current = data
for key in keys:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, list) and key.isdigit():
idx = int(key)
current = current[idx] if idx < len(current) else None
else:
return None
return current
class SequentialOrchestrator:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
self._steps: list[PipelineStep] = []
self._context: dict[str, Any] = {}
def add_step(self, step: PipelineStep) -> "SequentialOrchestrator":
self._steps.append(step)
return self
def set_context(self, key: str, value: Any):
self._context[key] = value
async def execute(self) -> list[StepResult]:
results = []
context = dict(self._context)
for step in self._steps:
if step.condition and not step.condition(context):
logger.info(f"Skipping step '{step.name}': condition not met")
continue
resolved_args = step.resolve_arguments(context)
logger.info(f"Executing step '{step.name}' with tool '{step.tool_name}'")
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= step.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(step.tool_name, resolved_args),
timeout=step.timeout,
)
duration = (time.monotonic() - start) * 1000
context[f"steps.{step.name}"] = output
context["last_result"] = output
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=output,
success=True,
duration_ms=duration,
))
break
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= step.retry_count:
logger.warning(f"Step '{step.name}' failed, retrying ({attempt}/{step.retry_count})")
await asyncio.sleep(1.0 * attempt)
else:
duration = (time.monotonic() - start) * 1000
results.append(StepResult(
step_name=step.name,
tool_name=step.tool_name,
input_args=resolved_args,
output=None,
success=False,
error=last_error,
duration_ms=duration,
))
break
return results
Usage Example
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = SequentialOrchestrator(discovery)
orchestrator.set_context("user_query", "MCP protocol parsing and Python client integration")
orchestrator.add_step(PipelineStep(
name="search",
tool_name="search_docs",
arguments={"query": "$user_query", "limit": 5},
))
orchestrator.add_step(PipelineStep(
name="extract",
tool_name="extract_summary",
arguments={"text": "$steps.search"},
condition=lambda ctx: bool(ctx.get("steps.search")),
))
orchestrator.add_step(PipelineStep(
name="translate",
tool_name="translate_text",
arguments={"text": "$steps.extract", "target_lang": "en"},
condition=lambda ctx: bool(ctx.get("steps.extract")),
))
results = await orchestrator.execute()
for r in results:
status = "OK" if r.success else "FAIL"
print(f"[{status}] [{r.step_name}] {r.tool_name} ({r.duration_ms:.0f}ms)")
if r.error:
print(f" Error: {r.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 5: Multi-Tool Parallel Orchestration
Problem: When multiple tool calls have no dependencies between them, sequential execution wastes time. How do you safely call multiple MCP tools in parallel and aggregate results?
Solution: Implement a parallel orchestrator based on asyncio with concurrency control, result aggregation, and DAG dependency execution.
Parallel Orchestration Architecture
┌──────────┐
│ Request │
└────┬─────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Tool A │ │ Tool B │ │ Tool C │
│ Search │ │ Query DB │ │ Call API │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────┬──┘────────────┘
│
▼
┌─────────────┐
│ Aggregate │
└─────────────┘
Complete Implementation
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class ParallelTask:
name: str
tool_name: str
arguments: dict[str, Any] | Callable[[dict[str, Any]], dict[str, Any]]
depends_on: list[str] = field(default_factory=list)
timeout: float = 30.0
retry_count: int = 0
@dataclass
class TaskResult:
name: str
tool_name: str
output: Any
success: bool = True
error: str | None = None
duration_ms: float = 0.0
class ParallelOrchestrator:
def __init__(self, discovery: ToolDiscovery, max_concurrency: int = 5):
self._discovery = discovery
self._max_concurrency = max_concurrency
self._tasks: list[ParallelTask] = []
def add_task(self, task: ParallelTask) -> "ParallelOrchestrator":
self._tasks.append(task)
return self
async def execute(self, initial_context: dict[str, Any] | None = None) -> dict[str, TaskResult]:
context = dict(initial_context or {})
results: dict[str, TaskResult] = {}
semaphore = asyncio.Semaphore(self._max_concurrency)
completed = set()
remaining = list(self._tasks)
while remaining:
ready = [
t for t in remaining
if all(dep in completed for dep in t.depends_on)
]
if not ready and remaining:
logger.error("Circular dependency detected")
break
coros = [self._execute_task(task, context, results, semaphore) for task in ready]
task_results = await asyncio.gather(*coros, return_exceptions=True)
for task, result in zip(ready, task_results):
if isinstance(result, Exception):
results[task.name] = TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=str(result),
)
else:
results[task.name] = result
context[f"tasks.{task.name}"] = result.output
completed.add(task.name)
remaining.remove(task)
return results
async def _execute_task(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
semaphore: asyncio.Semaphore,
) -> TaskResult:
async with semaphore:
resolved_args = self._resolve_arguments(task, context, results)
import time
start = time.monotonic()
attempt = 0
last_error = None
while attempt <= task.retry_count:
try:
output = await asyncio.wait_for(
self._discovery.call_tool(task.tool_name, resolved_args),
timeout=task.timeout,
)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=output,
success=True,
duration_ms=duration,
)
except Exception as e:
last_error = str(e)
attempt += 1
if attempt <= task.retry_count:
await asyncio.sleep(0.5 * attempt)
duration = (time.monotonic() - start) * 1000
return TaskResult(
name=task.name,
tool_name=task.tool_name,
output=None,
success=False,
error=last_error,
duration_ms=duration,
)
def _resolve_arguments(
self,
task: ParallelTask,
context: dict[str, Any],
results: dict[str, TaskResult],
) -> dict[str, Any]:
if callable(task.arguments):
return task.arguments(context)
resolved = {}
for key, value in task.arguments.items():
if isinstance(value, str) and value.startswith("$"):
ref_path = value[1:]
if ref_path.startswith("tasks."):
task_name = ref_path.split(".")[1]
if task_name in results:
resolved[key] = results[task_name].output
else:
resolved[key] = _deep_get(context, ref_path)
else:
resolved[key] = value
return resolved
class DagOrchestrator(ParallelOrchestrator):
def validate_dag(self) -> list[str]:
errors = []
task_names = {t.name for t in self._tasks}
for task in self._tasks:
for dep in task.depends_on:
if dep not in task_names:
errors.append(f"Task '{task.name}' depends on unknown task '{dep}'")
visited = set()
path = set()
def has_cycle(name: str) -> bool:
visited.add(name)
path.add(name)
task = next((t for t in self._tasks if t.name == name), None)
if task:
for dep in task.depends_on:
if dep in path:
return True
if dep not in visited and has_cycle(dep):
return True
path.remove(name)
return False
for task in self._tasks:
if task.name not in visited:
if has_cycle(task.name):
errors.append("Circular dependency detected in DAG")
break
return errors
Usage Example
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
orchestrator = ParallelOrchestrator(discovery, max_concurrency=3)
orchestrator.add_task(ParallelTask(
name="search_web",
tool_name="web_search",
arguments={"query": "MCP protocol 2026"},
))
orchestrator.add_task(ParallelTask(
name="search_docs",
tool_name="search_docs",
arguments={"query": "Model Context Protocol Python"},
))
orchestrator.add_task(ParallelTask(
name="query_db",
tool_name="query_database",
arguments={"sql": "SELECT * FROM tools WHERE category='MCP'"},
))
orchestrator.add_task(ParallelTask(
name="merge_results",
tool_name="merge_and_deduplicate",
arguments={"sources": "$tasks.search_web,sources: $tasks.search_docs,sources: $tasks.query_db"},
depends_on=["search_web", "search_docs", "query_db"],
))
results = await orchestrator.execute({"user_query": "MCP client integration"})
for name, result in results.items():
status = "OK" if result.success else "FAIL"
print(f"[{status}] [{name}] {result.tool_name} ({result.duration_ms:.0f}ms)")
if result.error:
print(f" Error: {result.error}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 6: Error Handling and Retry
Problem: MCP clients need robust error handling and retry mechanisms when facing network instability, server overload, or tool execution exceptions.
Solution: Implement layered error handling with exponential backoff retry, circuit breakers, and fallback strategies.
Error Handling Architecture
┌────────────────────────────────────────────┐
│ Error Handling Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Retry │ │ Circuit │ │ Fallback│ │
│ │ Strategy │ │ Breaker │ │ Handler │ │
│ └──────────┘ └──────────┘ └─────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Error Classification │ │
│ │ Transient / Permanent / Unknown │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
Complete Implementation
import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
class ErrorCategory(Enum):
TRANSIENT = "transient"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: list[int] = field(default_factory=lambda: [
-32603, # Internal error
-32000, # Server error
])
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig | None = None):
self._config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float = 0
self._half_open_calls = 0
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time >= self._config.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def allow_request(self) -> bool:
state = self.state
if state == CircuitState.CLOSED:
return True
if state == CircuitState.HALF_OPEN:
if self._half_open_calls < self._config.half_open_max_calls:
self._half_open_calls += 1
return True
return False
return False
def record_success(self):
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self._config.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
else:
self._failure_count = 0
def record_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._success_count = 0
elif self._failure_count >= self._config.failure_threshold:
self._state = CircuitState.OPEN
class ResilientMcpClient:
def __init__(
self,
client,
retry_config: RetryConfig | None = None,
circuit_config: CircuitBreakerConfig | None = None,
):
self._client = client
self._retry_config = retry_config or RetryConfig()
self._circuit_breakers: dict[str, CircuitBreaker] = {}
self._circuit_config = circuit_config or CircuitBreakerConfig()
self._fallback_handlers: dict[str, Callable] = {}
def _get_breaker(self, server_name: str) -> CircuitBreaker:
if server_name not in self._circuit_breakers:
self._circuit_breakers[server_name] = CircuitBreaker(self._circuit_config)
return self._circuit_breakers[server_name]
def register_fallback(self, tool_name: str, handler: Callable):
self._fallback_handlers[tool_name] = handler
def classify_error(self, error: Exception) -> ErrorCategory:
if isinstance(error, (ConnectionError, asyncio.TimeoutError, OSError)):
return ErrorCategory.TRANSIENT
if isinstance(error, McpError):
if error.code in self._retry_config.retryable_errors:
return ErrorCategory.TRANSIENT
if error.code in [-32600, -32601, -32602]:
return ErrorCategory.PERMANENT
return ErrorCategory.UNKNOWN
async def call_with_retry(
self,
tool_name: str,
arguments: dict[str, Any],
server_name: str = "default",
) -> Any:
breaker = self._get_breaker(server_name)
if not breaker.allow_request():
fallback = self._fallback_handlers.get(tool_name)
if fallback:
logger.warning(f"Circuit open for '{server_name}', using fallback for '{tool_name}'")
return await fallback(arguments)
raise CircuitOpenError(f"Circuit breaker open for '{server_name}'")
last_error = None
for attempt in range(1, self._retry_config.max_attempts + 1):
try:
result = await self._client.call_tool(tool_name, arguments)
breaker.record_success()
return result
except Exception as e:
last_error = e
category = self.classify_error(e)
if category == ErrorCategory.PERMANENT:
breaker.record_failure()
raise
if category == ErrorCategory.TRANSIENT and attempt < self._retry_config.max_attempts:
delay = min(
self._retry_config.base_delay * (self._retry_config.exponential_base ** (attempt - 1)),
self._retry_config.max_delay,
)
if self._retry_config.jitter:
import random
delay *= (0.5 + random.random() * 0.5)
logger.warning(
f"Tool '{tool_name}' call failed (attempt {attempt}/{self._retry_config.max_attempts}), "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
else:
breaker.record_failure()
if fallback := self._fallback_handlers.get(tool_name):
logger.warning(f"All retries exhausted for '{tool_name}', using fallback")
return await fallback(arguments)
raise last_error
class CircuitOpenError(Exception):
pass
Usage Example
async def fallback_search(arguments: dict) -> str:
return json.dumps({"fallback": True, "query": arguments.get("query", ""), "results": []})
async def main():
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
resilient = ResilientMcpClient(
discovery,
retry_config=RetryConfig(max_attempts=3, base_delay=1.0),
circuit_config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
)
resilient.register_fallback("search_docs", fallback_search)
try:
result = await resilient.call_with_retry("search_docs", {"query": "MCP protocol parsing"})
print(f"Result: {result}")
except CircuitOpenError as e:
print(f"Service unavailable: {e}")
except Exception as e:
print(f"Failed: {e}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Pattern 7: Streaming Response Processing
Problem: Long-running tool calls need real-time progress feedback. The MCP protocol supports streaming responses via SSE and Notifications.
Solution: Implement a streaming response handler with progress notifications, partial result aggregation, and cancellation support.
Streaming Response Architecture
┌──────────┐ tools/call ┌──────────┐
│ Client │ ────────────> │ Server │
│ │ │ │
│ Stream │ <── notify ── │ Tool │
│ Handler │ progress │ Running │
│ │ <── notify ── │ │
│ │ progress │ │
│ │ <── result ── │ │
│ │ complete │ │
└──────────┘ └──────────┘
Complete Implementation
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
logger = logging.getLogger(__name__)
class StreamEventType(Enum):
PROGRESS = "progress"
PARTIAL_RESULT = "partial_result"
COMPLETE = "complete"
ERROR = "error"
LOG = "log"
@dataclass
class StreamEvent:
type: StreamEventType
data: Any
progress: float | None = None
message: str | None = None
timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())
@dataclass
class StreamResult:
events: list[StreamEvent] = field(default_factory=list)
final_result: Any = None
success: bool = True
error: str | None = None
total_duration_ms: float = 0.0
class StreamHandler:
def __init__(self):
self._progress_handlers: list[Callable[[StreamEvent], Any]] = []
self._partial_handlers: list[Callable[[StreamEvent], Any]] = []
self._complete_handlers: list[Callable[[StreamEvent], Any]] = []
self._error_handlers: list[Callable[[StreamEvent], Any]] = []
self._log_handlers: list[Callable[[StreamEvent], Any]] = []
def on_progress(self, handler: Callable[[StreamEvent], Any]):
self._progress_handlers.append(handler)
def on_partial_result(self, handler: Callable[[StreamEvent], Any]):
self._partial_handlers.append(handler)
def on_complete(self, handler: Callable[[StreamEvent], Any]):
self._complete_handlers.append(handler)
def on_error(self, handler: Callable[[StreamEvent], Any]):
self._error_handlers.append(handler)
def on_log(self, handler: Callable[[StreamEvent], Any]):
self._log_handlers.append(handler)
async def handle_notification(self, params: dict[str, Any]):
method = params.get("method", "")
if method == "notifications/progress":
progress_data = params.get("progress", {})
event = StreamEvent(
type=StreamEventType.PROGRESS,
data=progress_data,
progress=progress_data.get("progress", 0) / max(progress_data.get("total", 1), 1),
message=progress_data.get("message"),
)
for handler in self._progress_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Progress handler error: {e}")
elif method == "notifications/message":
level = params.get("level", "info")
event = StreamEvent(
type=StreamEventType.LOG,
data=params.get("data"),
message=f"[{level}] {params.get('data', {}).get('text', '')}",
)
for handler in self._log_handlers:
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Log handler error: {e}")
class StreamingMcpClient:
def __init__(self, client, discovery: ToolDiscovery):
self._client = client
self._discovery = discovery
self._stream_handler = StreamHandler()
self._active_calls: dict[str, asyncio.Task] = {}
client.on_notification(
"notifications/progress",
self._stream_handler.handle_notification,
)
client.on_notification(
"notifications/message",
self._stream_handler.handle_notification,
)
@property
def stream(self) -> StreamHandler:
return self._stream_handler
async def call_with_stream(
self,
tool_name: str,
arguments: dict[str, Any],
timeout: float = 120.0,
) -> StreamResult:
import time
start = time.monotonic()
result = StreamResult()
self._stream_handler.on_complete(lambda e: setattr(result, "final_result", e.data))
self._stream_handler.on_error(lambda e: setattr(result, "error", e.message))
try:
call_result = await asyncio.wait_for(
self._discovery.call_tool(tool_name, arguments),
timeout=timeout,
)
result.final_result = call_result
result.success = True
result.events.append(StreamEvent(
type=StreamEventType.COMPLETE,
data=call_result,
))
except asyncio.TimeoutError:
result.success = False
result.error = f"Tool call timed out after {timeout}s"
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=result.error,
))
except Exception as e:
result.success = False
result.error = str(e)
result.events.append(StreamEvent(
type=StreamEventType.ERROR,
data=None,
message=str(e),
))
result.total_duration_ms = (time.monotonic() - start) * 1000
return result
async def cancel(self, request_id: int | str):
await self._client.send_notification("notifications/cancelled", {
"requestId": request_id,
"reason": "User cancelled",
})
Usage Example
async def main():
client = JsonRpcClient(timeout=120.0)
await client.connect_stdio("python", ["mcp_server.py"])
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
streaming = StreamingMcpClient(client, discovery)
streaming.stream.on_progress(lambda e: print(f" Progress: {e.progress:.0%} - {e.message or ''}"))
streaming.stream.on_log(lambda e: print(f" Log: {e.message}"))
streaming.stream.on_complete(lambda e: print(f" Complete!"))
streaming.stream.on_error(lambda e: print(f" Error: {e.message}"))
print("Calling long-running tool with streaming...")
result = await streaming.call_with_stream(
"batch_process",
{"items": list(range(100)), "operation": "analyze"},
timeout=120.0,
)
print(f"\nResult: success={result.success}, duration={result.total_duration_ms:.0f}ms")
if result.final_result:
print(f"Output: {str(result.final_result)[:200]}...")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
5 Common Pitfalls and Solutions
1. Missing initialized Notification After Handshake
Pitfall: After sending the initialize request, you forget to send the notifications/initialized notification. The Server keeps waiting for the handshake to complete, and all subsequent requests time out.
Solution:
async def safe_initialize(client: JsonRpcClient) -> dict:
result = await client.send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "my-client", "version": "1.0.0"},
})
# Must send initialized notification
await client.send_notification("notifications/initialized")
return result
2. Sending Requests Before SSE Endpoint is Ready
Pitfall: After establishing the SSE connection, you immediately send JSON-RPC requests before receiving the endpoint event, causing requests to hit the wrong URL.
Solution:
class SafeSseClient:
def __init__(self, server_url: str):
self._transport = SseTransport(server_url)
self._endpoint_ready = asyncio.Event()
self._transport.on_event("endpoint", self._on_endpoint)
async def _on_endpoint(self, data):
self._endpoint_ready.set()
async def send_request(self, method: str, params: dict | None = None) -> Any:
await asyncio.wait_for(self._endpoint_ready.wait(), timeout=10.0)
# Now safe to send requests
...
3. Missing required Field in Tool Schema Conversion
Pitfall: When converting MCP tool schemas to OpenAI function calling format, inputSchema is used directly as parameters, but some LLMs require the required field to be present.
Solution:
def safe_to_openai_schema(mcp_tool: McpTool) -> dict:
schema = dict(mcp_tool.input_schema)
if "required" not in schema:
schema["required"] = []
if "type" not in schema:
schema["type"] = "object"
return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description,
"parameters": schema,
},
}
4. Shared State Race Conditions in Parallel Calls
Pitfall: Multiple parallel tool calls share the same httpx.AsyncClient or asyncio.Future dictionary, causing response mismatches.
Solution: Use isolated request ID spaces or connection pool isolation for each parallel call:
class IsolatedClientPool:
def __init__(self, base_url: str, pool_size: int = 5):
self._clients = [McpSseClient(base_url) for _ in range(pool_size)]
self._index = 0
self._lock = asyncio.Lock()
async def get_client(self) -> McpSseClient:
async with self._lock:
client = self._clients[self._index % len(self._clients)]
self._index += 1
return client
5. Confusing Progress Notifications with Final Results
Pitfall: In the SSE channel, progress notifications and final results share the same stream. The client treats progress notifications as final results.
Solution: Strictly distinguish message types. Only process responses with an id field as request results:
async def _handle_sse_message(self, message: dict):
if "id" in message:
# This is a request response, match pending future
future = self._pending.pop(message["id"], None)
if future and not future.done():
future.set_result(message.get("result"))
elif "method" in message:
# This is a notification, forward to stream handler
await self._stream_handler.handle_notification(message)
10 Common Error Troubleshooting
| Error Message | Cause | Solution |
|---|---|---|
MCP Error [-32600]: Invalid Request |
Malformed JSON-RPC message, missing required fields | Check request has jsonrpc, method, id fields; use /en/json/format to validate |
MCP Error [-32601]: Method not found |
Called a method the Server doesn't support | Call initialize first to confirm Server capabilities; check method spelling |
MCP Error [-32602]: Invalid params |
Tool parameter type or structure mismatch | Use tools/list to get correct inputSchema; verify parameters against it |
Connection refused |
MCP Server not running or wrong port | Confirm Server process is running; check port and URL config |
SSE connection timeout |
Network unreachable or Server not responding to SSE handshake | Check firewall rules; confirm SSE endpoint is reachable |
Request timed out after 30s |
Tool execution exceeds client timeout setting | Increase timeout parameter or use streaming responses for progress |
Circuit breaker open |
Consecutive failures exceed circuit breaker threshold | Check Server health; wait for recovery timeout before retrying |
Tool not found: xxx |
Called an unregistered tool name | Re-run tools/list discovery; check tool name case sensitivity |
Endpoint not received |
SSE connection established but no endpoint event received | Confirm Server sends endpoint event correctly; check SSE route config |
JSON decode error in SSE stream |
SSE data is not valid JSON | Check Server-side SSE event format; ensure data field contains valid JSON |
Advanced Optimization Tips
1. Connection Pool and Multi-Server Management
In production, AI Agents typically need to connect to multiple MCP Servers simultaneously. Use a connection pool to manage multiple client instances:
class McpConnectionPool:
def __init__(self):
self._connections: dict[str, Any] = {}
self._discovery_map: dict[str, ToolDiscovery] = {}
async def connect_stdio(self, name: str, command: str, args: list[str] | None = None):
client = JsonRpcClient(timeout=30.0)
await client.connect_stdio(command, args)
await initialize_client(client)
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
async def connect_sse(self, name: str, url: str):
client = McpSseClient(url)
await client.connect()
await client.initialize()
discovery = ToolDiscovery(client)
await discovery.discover()
self._connections[name] = client
self._discovery_map[name] = discovery
def get_discovery(self, name: str) -> ToolDiscovery | None:
return self._discovery_map.get(name)
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
for name, discovery in self._discovery_map.items():
if discovery.registry.get(tool_name):
return await discovery.call_tool(tool_name, arguments)
raise ValueError(f"Tool '{tool_name}' not found in any connected server")
async def close_all(self):
for client in self._connections.values():
await client.close()
2. Tool Call Caching
Cache idempotent tool call results to reduce duplicate requests:
import hashlib
import json
from typing import Any
class ToolCallCache:
def __init__(self, ttl: float = 300.0, max_size: int = 1000):
self._cache: dict[str, tuple[float, Any]] = {}
self._ttl = ttl
self._max_size = max_size
def _make_key(self, tool_name: str, arguments: dict) -> str:
raw = json.dumps({"tool": tool_name, "args": arguments}, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()
def get(self, tool_name: str, arguments: dict) -> tuple[bool, Any]:
key = self._make_key(tool_name, arguments)
if key in self._cache:
ts, value = self._cache[key]
if asyncio.get_event_loop().time() - ts < self._ttl:
return True, value
del self._cache[key]
return False, None
def set(self, tool_name: str, arguments: dict, value: Any):
if len(self._cache) >= self._max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
del self._cache[oldest_key]
key = self._make_key(tool_name, arguments)
self._cache[key] = (asyncio.get_event_loop().time(), value)
def invalidate(self, tool_name: str, arguments: dict | None = None):
if arguments:
key = self._make_key(tool_name, arguments)
self._cache.pop(key, None)
else:
prefix = hashlib.sha256(json.dumps({"tool": tool_name}).encode()).hexdigest()[:8]
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for k in keys_to_remove:
del self._cache[k]
3. Observability and Tracing
Add OpenTelemetry tracing to MCP client calls:
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer("mcp-client", "1.0.0")
meter = metrics.get_meter("mcp-client", "1.0.0")
tool_call_counter = meter.create_counter("mcp.tool.calls", description="MCP tool call count")
tool_call_duration = meter.create_histogram("mcp.tool.duration", description="MCP tool call duration")
class ObservableMcpClient:
def __init__(self, discovery: ToolDiscovery):
self._discovery = discovery
async def call_tool(self, tool_name: str, arguments: dict) -> Any:
with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
span.set_attribute("mcp.tool.name", tool_name)
span.set_attribute("mcp.tool.args_count", len(arguments))
import time
start = time.monotonic()
try:
result = await self._discovery.call_tool(tool_name, arguments)
duration = time.monotonic() - start
span.set_status(Status(StatusCode.OK))
tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
tool_call_duration.record(duration, {"tool": tool_name})
return result
except Exception as e:
duration = time.monotonic() - start
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
tool_call_duration.record(duration, {"tool": tool_name})
raise
Comparison: 3 Orchestration Approaches
| Dimension | Sequential | Parallel | DAG |
|---|---|---|---|
| Execution | One after another | All tasks simultaneously | Topologically sorted by dependencies |
| Use Case | Strict data dependencies between tools | Completely independent tools | Partial dependencies, partial independence |
| Latency | Sum of all tool durations | Duration of slowest tool | Sum of critical path durations |
| Error Propagation | Previous failure skips all subsequent | Single failure doesn't affect others | Downstream of failed node is skipped |
| Complexity | Low | Medium | High |
| Resource Usage | Low (one connection at a time) | High (concurrent connection pool) | Medium (layered concurrency) |
| Result Consistency | Strong (deterministic order) | Weak (non-deterministic completion order) | Medium (non-deterministic within layers) |
| Timeout Control | Per-step independent timeout | Global + per-task timeout | Per-step + global timeout |
| Typical Scenario | Search -> Extract -> Translate | Search 3 data sources simultaneously | Search A+B -> Merge -> Translate |
Selection Guide:
- Strict causal dependencies between tools → Sequential orchestration
- Completely independent tools → Parallel orchestration
- Partial dependencies between tools → DAG orchestration
For more on AI Agent workflow orchestration, see AI Agent Workflow DAG Orchestration and AI Agent Protocols Comparison.
Recommended Online Tools
- JSON Formatter: When debugging MCP protocol messages, use /en/json/format to format JSON-RPC requests and responses
- cURL to Code: When testing MCP Server endpoints, use /en/dev/curl-to-code to quickly generate Python client code
- Base64 Encode/Decode: When handling binary data in MCP transport, use /en/encode/base64 for encoding and decoding
Summary: Python MCP client integration goes far beyond sending JSON-RPC requests. From basic stdio connections to SSE remote transport, from tool discovery and dynamic registration to multi-tool sequential/parallel/DAG orchestration, from error handling with circuit breakers and fallbacks to streaming responses with progress tracking—every aspect requires production-grade engineering. Master these 7 patterns, and your AI Agent will have reliable, efficient, and observable tool calling capabilities. Remember: MCP protocol parsing is the foundation, multi-tool orchestration is the core, error handling is the safeguard, and streaming responses are the experience. Refer to the official Model Context Protocol Specification for the latest protocol definitions.
Try these browser-local tools — no sign-up required →