Python LangGraphマルチエージェント協調:状態マシンからワークフローオーケストレーションまでの5つの実践パターン
AI与大数据
AIエージェントはQ&Aしかできず、複雑なタスクは手動分解に頼るしかない
ユーザーが「競合分析してレポートを生成して」と言っても、エージェントは一歩ずつ質問するしかない;3つのエージェントが「調査→分析→執筆」パイプラインで協調する必要があるが、既製のオーケストレーションフレームワークがない;エージェントが実行途中で人間の確認が必要だが、一時停止と再開の方法がわからない。シングルエージェントの時代は終わった——2026年、LangGraphはマルチエージェント協調を手作業の継ぎ接ぎから宣言的オーケストレーションへと変えた。
本記事はLangGraph状態グラフの基礎から出発し、状態マシン→マルチエージェントオーケストレーション→条件ルーティング→ヒューマンインザループ→永続状態の5つの実践パターンをガイドする。
LangGraphコア概念
| 概念 | 説明 |
|---|---|
| StateGraph | 状態グラフ、ワークフローのノードとエッジを定義 |
| State | 状態、ワークフローノード間で渡される共有データ構造 |
| Node | ノード、特定ロジックを実行する関数、Stateを受け取り更新を返す |
| Edge | エッジ、ノード間の遷移関係を定義 |
| Conditional Edge | 条件エッジ、Stateに基づき次のノードを動的に決定 |
| Checkpoint | チェックポイント、Stateを永続化、一時停止/再開をサポート |
| Interrupt | 割り込み、ワークフローを一時停止して外部入力を待機(ヒューマンインザループ) |
| Tool Node | ツールノード、外部ツール呼び出しをカプセル化 |
| Subgraph | サブグラフ、複雑なワークフローを再利用可能モジュールとしてカプセル化 |
| Command | コマンドオブジェクト、ノード間の状態更新とルーティング制御をサポート |
ワークフロー実行フロー
1. Stateを定義(TypedDictまたはPydantic Model)
2. StateGraph(State)を作成
3. ノードを追加:graph.add_node("name", function)
4. エッジを追加:graph.add_edge("node_a", "node_b")
5. 条件エッジを追加:graph.add_conditional_edges("node_a", router)
6. エントリポイントを設定:graph.set_entry_point("start")
7. グラフをコンパイル:app = graph.compile(checkpointer=...)
8. 実行:app.invoke({"input": ...}, config={"configurable": {"thread_id": "..."}})
問題分析:マルチエージェント協調の5つの課題
- 状態管理の混乱:複数エージェント間の共有状態、手動渡しは漏れや競合が発生しやすい
- ワークフローオーケストレーションの複雑さ:条件分岐、ループ、並列実行、ハードコードif-elseは保守困難
- ヒューマンインザループの困難:エージェントが人間の確認を必要とする際、エレガントな一時停止/再開方法がない
- エラー回復の脆弱性:長いワークフローが途中で失敗、最初からやり直すコストが大きい
- 観測可能性の欠落:マルチエージェント協調の実行プロセスがブラックボックス、デバッグが困難
ステップバイステップ:5つの実践パターン
パターン1:基本状態マシン——シングルエージェントワークフロー
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
research_notes: str
summary: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def research_node(state: ResearchState) -> dict:
messages = [
SystemMessage(content="あなたは専門リサーチャーです。与えられたトピックについて深く研究し、詳細なリサーチノートを出力してください。"),
HumanMessage(content=f"以下のトピックについて深く研究してください:{state['topic']}"),
]
response = llm.invoke(messages)
return {"research_notes": response.content}
def summarize_node(state: ResearchState) -> dict:
messages = [
SystemMessage(content="あなたは専門エディターです。リサーチノートを簡潔な要約にまとめてください。"),
HumanMessage(content=f"以下のリサーチノートを要約してください:\n\n{state['research_notes']}"),
]
response = llm.invoke(messages)
return {"summary": response.content}
def format_node(state: ResearchState) -> dict:
formatted = f"""# リサーチレポート:{state['topic']}
## リサーチノート
{state['research_notes']}
## 要約
{state['summary']}
"""
return {"messages": [HumanMessage(content=formatted)]}
graph = StateGraph(ResearchState)
graph.add_node("research", research_node)
graph.add_node("summarize", summarize_node)
graph.add_node("format", format_node)
graph.add_edge(START, "research")
graph.add_edge("research", "summarize")
graph.add_edge("summarize", "format")
graph.add_edge("format", END)
app = graph.compile()
result = app.invoke({
"messages": [],
"topic": "2026年大規模言語モデル技術トレンド",
"research_notes": "",
"summary": "",
})
print(result["messages"][-1].content)
パターン2:マルチエージェント協調——スーパーバイザーパターン
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class CollaborationState(TypedDict):
messages: Annotated[list, add_messages]
task: str
next_agent: str
research_result: str
analysis_result: str
writing_result: str
review_feedback: str
iteration_count: int
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def supervisor_node(state: CollaborationState) -> dict:
if state["iteration_count"] >= 3:
return {"next_agent": "end"}
if not state["research_result"]:
return {"next_agent": "researcher"}
if not state["analysis_result"]:
return {"next_agent": "analyst"}
if not state["writing_result"]:
return {"next_agent": "writer"}
if not state["review_feedback"]:
return {"next_agent": "reviewer"}
if "needs revision" in state["review_feedback"].lower():
return {
"next_agent": "writer",
"writing_result": "",
"review_feedback": "",
"iteration_count": state["iteration_count"] + 1,
}
return {"next_agent": "end"}
def researcher_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="あなたはリサーチャーエージェントです。タスクに関連する情報を収集・整理してください。"),
HumanMessage(content=f"リサーチタスク:{state['task']}"),
]
response = llm.invoke(messages)
return {"research_result": response.content}
def analyst_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="あなたはアナリストエージェントです。リサーチ結果に基づいて深い分析を行ってください。"),
HumanMessage(content=f"以下のリサーチ結果に基づいて分析してください:\n\n{state['research_result']}"),
]
response = llm.invoke(messages)
return {"analysis_result": response.content}
def writer_node(state: CollaborationState) -> dict:
context = f"リサーチ結果:{state['research_result']}\n\n分析結果:{state['analysis_result']}"
if state["review_feedback"]:
context += f"\n\n修正フィードバック:{state['review_feedback']}"
messages = [
SystemMessage(content="あなたはライターエージェントです。リサーチと分析結果に基づいて高品質な記事を執筆してください。"),
HumanMessage(content=f"記事を執筆してください:\n\n{context}"),
]
response = llm.invoke(messages)
return {"writing_result": response.content}
def reviewer_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="あなたはレビューアージェントです。記事の品質をレビューしてください。修正が必要な場合は具体的なフィードバックを、満足なら 'approved' と言ってください。"),
HumanMessage(content=f"以下の記事をレビューしてください:\n\n{state['writing_result']}"),
]
response = llm.invoke(messages)
return {"review_feedback": response.content}
def route_after_supervisor(state: CollaborationState) -> str:
next_agent = state["next_agent"]
if next_agent == "end":
return END
return next_agent
graph = StateGraph(CollaborationState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("analyst", analyst_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_after_supervisor, {
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"reviewer": "reviewer",
END: END,
})
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()
result = app.invoke({
"messages": [],
"task": "2026年AIエージェント技術トレンドを分析してレポートを作成",
"next_agent": "",
"research_result": "",
"analysis_result": "",
"writing_result": "",
"review_feedback": "",
"iteration_count": 0,
})
print(result["writing_result"])
パターン3:条件ルーティング——動的ワークフロー
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
class SupportState(TypedDict):
messages: Annotated[list, add_messages]
user_input: str
intent: str
category: str
response: str
escalated: bool
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def classify_intent_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="""あなたはカスタマーサービスインテント分類器です。ユーザー入力を分析し、JSON形式で返してください:
{"intent": "technical|billing|general|complaint", "category": "具体的な分類", "escalated": false}
ユーザーが感情的または問題が深刻な場合、escalatedをtrueに設定してください。"""),
HumanMessage(content=state["user_input"]),
]
response = llm.invoke(messages)
try:
result = json.loads(response.content)
return {
"intent": result.get("intent", "general"),
"category": result.get("category", ""),
"escalated": result.get("escalated", False),
}
except json.JSONDecodeError:
return {"intent": "general", "category": "未分類", "escalated": False}
def technical_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="あなたはテクニカルサポートエージェントです。専門的な技術問題のソリューションを提供してください。"),
HumanMessage(content=f"ユーザーの問題:{state['user_input']}\nカテゴリ:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def billing_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="あなたは請求サポートエージェントです。請求関連の問題(返金、料金照会など)を処理してください。"),
HumanMessage(content=f"ユーザーの問題:{state['user_input']}\nカテゴリ:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def general_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="あなたは一般サポートエージェントです。一般的な問い合わせを処理してください。"),
HumanMessage(content=f"ユーザーの問題:{state['user_input']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def escalation_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="あなたはシニアサポートエージェントです。エスカレーションが必要な複雑または緊急の問題を処理してください。"),
HumanMessage(content=f"緊急問題:{state['user_input']}\nカテゴリ:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def route_by_intent(state: SupportState) -> str:
if state["escalated"]:
return "escalation"
intent_map = {
"technical": "technical",
"billing": "billing",
"general": "general",
"complaint": "escalation",
}
return intent_map.get(state["intent"], "general")
graph = StateGraph(SupportState)
graph.add_node("classify", classify_intent_node)
graph.add_node("technical", technical_support_node)
graph.add_node("billing", billing_support_node)
graph.add_node("general", general_support_node)
graph.add_node("escalation", escalation_node)
graph.add_edge(START, "classify")
graph.add_conditional_edges("classify", route_by_intent, {
"technical": "technical",
"billing": "billing",
"general": "general",
"escalation": "escalation",
})
graph.add_edge("technical", END)
graph.add_edge("billing", END)
graph.add_edge("general", END)
graph.add_edge("escalation", END)
app = graph.compile()
result = app.invoke({
"messages": [],
"user_input": "サーバーに突然アクセスできなくなりました、データベース接続タイムアウト、非常に緊急です!",
"intent": "",
"category": "",
"response": "",
"escalated": False,
})
print(result["response"])
パターン4:ヒューマンインザループ
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ApprovalState(TypedDict):
messages: Annotated[list, add_messages]
task: str
draft: str
human_feedback: str
final_result: str
approved: bool
llm = ChatOpenAI(model="gpt-4o", temperature=0)
checkpointer = MemorySaver()
def generate_draft_node(state: ApprovalState) -> dict:
messages = [
SystemMessage(content="あなたはコンテンツ作成エージェントです。タスク要件に基づいてドラフトを生成してください。"),
HumanMessage(content=f"タスク:{state['task']}"),
]
response = llm.invoke(messages)
return {"draft": response.content}
def human_review_node(state: ApprovalState) -> dict:
return {}
def process_feedback_node(state: ApprovalState) -> dict:
if state["approved"]:
return {"final_result": state["draft"]}
messages = [
SystemMessage(content="あなたはコンテンツ修正エージェントです。フィードバックに基づいてドラフトを修正してください。"),
HumanMessage(content=f"元のドラフト:{state['draft']}\n\n修正フィードバック:{state['human_feedback']}"),
]
response = llm.invoke(messages)
return {"draft": response.content, "human_feedback": ""}
def should_continue(state: ApprovalState) -> str:
if state["approved"]:
return "end"
return "revise"
graph = StateGraph(ApprovalState)
graph.add_node("generate_draft", generate_draft_node)
graph.add_node("human_review", human_review_node)
graph.add_node("process_feedback", process_feedback_node)
graph.add_edge(START, "generate_draft")
graph.add_edge("generate_draft", "human_review")
graph.add_conditional_edges("human_review", should_continue, {
"revise": "process_feedback",
"end": END,
})
graph.add_edge("process_feedback", "human_review")
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"],
)
thread_id = "approval-001"
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke({
"messages": [],
"task": "2026年AI業界トレンドレポートを作成",
"draft": "",
"human_feedback": "",
"final_result": "",
"approved": False,
}, config=config)
current_state = app.get_state(config)
print("ドラフト内容:", current_state.values.get("draft", ""))
app.update_state(config, {
"human_feedback": "マルチモーダルモデルに関する内容を追加してください",
"approved": False,
})
app.invoke(None, config=config)
current_state = app.get_state(config)
print("修正後ドラフト:", current_state.values.get("draft", ""))
app.update_state(config, {"approved": True})
final_result = app.invoke(None, config=config)
print("最終結果:", final_result["final_result"])
パターン5:永続状態——PostgreSQL Checkpointer
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import asyncio
from psycopg_pool import AsyncConnectionPool
class LongRunningState(TypedDict):
messages: Annotated[list, add_messages]
task: str
step1_result: str
step2_result: str
step3_result: str
current_step: int
error: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def step1_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="あなたはデータ処理エージェントです。ステップ1:データ収集とクリーニングを実行してください。"),
HumanMessage(content=f"処理タスク:{state['task']}"),
]
response = llm.invoke(messages)
return {"step1_result": response.content, "current_step": 1, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
def step2_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="あなたは分析エージェントです。ステップ2:データ分析とモデリングを実行してください。"),
HumanMessage(content=f"ステップ1の結果に基づいて分析:{state['step1_result']}"),
]
response = llm.invoke(messages)
return {"step2_result": response.content, "current_step": 2, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
def step3_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="あなたはレポートエージェントです。ステップ3:最終レポートを生成してください。"),
HumanMessage(content=f"分析結果に基づいてレポートを生成:{state['step2_result']}"),
]
response = llm.invoke(messages)
return {"step3_result": response.content, "current_step": 3, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
graph = StateGraph(LongRunningState)
graph.add_node("step1", step1_node)
graph.add_node("step2", step2_node)
graph.add_node("step3", step3_node)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
async def run_with_persistence():
connection_string = "postgresql://user:pass@localhost:5432/langgraph"
async with AsyncConnectionPool(connection_string) as pool:
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
thread_id = "long-running-task-001"
config = {"configurable": {"thread_id": thread_id}}
result = await app.ainvoke({
"messages": [],
"task": "Q1売上データを分析して予測レポートを生成",
"step1_result": "",
"step2_result": "",
"step3_result": "",
"current_step": 0,
"error": "",
}, config=config)
state = await app.aget_state(config)
print(f"現在のステップ: {state.values['current_step']}")
print(f"最終結果: {state.values.get('step3_result', '未完了')}")
if state.values.get("error"):
print(f"ステップ {state.values['current_step']} から再開...")
result = await app.ainvoke(None, config=config)
asyncio.run(run_with_persistence())
落とし穴ガイド
落とし穴1:State内の可変オブジェクトを直接変更
# ❌ 誤り:state内のリストを直接変更
def bad_node(state: MyState) -> dict:
state["items"].append("new_item") # 元のstateを直接変更
return state
# ✅ 正しい:新しい値を返し、LangGraphのreducerに処理させる
def good_node(state: MyState) -> dict:
return {"items": state["items"] + ["new_item"]}
# またはAnnotated + reducerを使用
# items: Annotated[list, operator.add]
落とし穴2:条件エッジが存在しないノード名を返す
# ❌ 誤り:ルーター関数が未登録のノード名を返す
def bad_router(state: MyState) -> str:
return "non_existent_node"
graph.add_conditional_edges("start", bad_router)
# ✅ 正しい:ルーターは登録済みノード名のみを返し、マッピングにすべての可能性を列挙
def good_router(state: MyState) -> str:
if state["intent"] == "tech":
return "technical"
return "general"
graph.add_conditional_edges("start", good_router, {
"technical": "technical",
"general": "general",
})
落とし穴3:checkpointerの設定忘れで再開不可
# ❌ 誤り:checkpointerなし、interrupt_beforeが動作しない
app = graph.compile(interrupt_before=["human_review"])
# ✅ 正しい:checkpointerを必ず提供
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["human_review"],
)
落とし穴4:ノード関数が不完全な状態更新を返す
# ❌ 誤り:ノードがNoneまたは空dictを返し、状態が失われる
def bad_node(state: MyState) -> dict:
result = do_something()
# 状態更新の返し忘れ
return {}
# ✅ 正しい:ノードは更新が必要な状態フィールドを返す必要がある
def good_node(state: MyState) -> dict:
result = do_something()
return {"result": result, "status": "completed"}
落とし穴5:非同期環境で同期checkpointerを使用
# ❌ 誤り:非同期環境で同期MemorySaverを使用
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(checkpointer=MemorySaver())
await app.ainvoke(input_data, config=config)
# ✅ 正しい:非同期環境では非同期checkpointerを使用
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
app = graph.compile(checkpointer=AsyncPostgresSaver(pool))
await app.ainvoke(input_data, config=config)
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | KeyError: 'field_name' |
Stateに必須フィールドが欠落 | 初期invokeにすべてのTypedDictフィールドを含める |
| 2 | ValueError: Node 'xxx' not found |
条件エッジが未登録ノードを参照 | add_nodeとadd_conditional_edgesのノード名を確認 |
| 3 | GraphRecursionError |
グラフに無限ループが存在 | ループカウンターまたは終了条件を追加 |
| 4 | Missing checkpointer |
interrupt使用時にcheckpointerなし | コンパイル時にcheckpointerパラメータを渡す |
| 5 | InvalidStateUpdate |
ノードがStateに存在しないフィールドを返す | 返されるキーがTypedDict定義と一致することを確認 |
| 6 | asyncio.run() cannot be called from a running event loop |
Jupyterでasyncio.runを呼び出し | awaitまたはnest_asyncioを使用 |
| 7 | psycopg.OperationalError |
PostgreSQL接続失敗 | 接続文字列を確認、データベースが実行中か確認 |
| 8 | TypeError: 'NoneType' object is not subscriptable |
ノードがNoneを返す | ノード関数がdictを返すことを確認 |
| 9 | LangGraphError: Cannot resume without thread_id |
再開時にthread_idが欠落 | configでthread_idを提供 |
| 10 | RateLimitError from OpenAI |
API呼び出しレート制限超過 | リトライロジックを追加または同時実行を減らす |
高度な最適化
1. サブグラフカプセル化——再利用可能なワークフローモジュール
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ResearchSubState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
research_output: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def deep_research_node(state: ResearchSubState) -> dict:
messages = [
SystemMessage(content="あなたはディープリサーチャーです。包括的な深い研究を行ってください。"),
HumanMessage(content=f"ディープリサーチ:{state['topic']}"),
]
response = llm.invoke(messages)
return {"research_output": response.content}
def fact_check_node(state: ResearchSubState) -> dict:
messages = [
SystemMessage(content="あなたはファクトチェッカーです。リサーチ結果の正確性を検証してください。"),
HumanMessage(content=f"以下の内容をファクトチェック:{state['research_output']}"),
]
response = llm.invoke(messages)
return {"research_output": f"{state['research_output']}\n\nファクトチェック:{response.content}"}
research_subgraph = StateGraph(ResearchSubState)
research_subgraph.add_node("deep_research", deep_research_node)
research_subgraph.add_node("fact_check", fact_check_node)
research_subgraph.add_edge(START, "deep_research")
research_subgraph.add_edge("deep_research", "fact_check")
research_subgraph.add_edge("fact_check", END)
research_app = research_subgraph.compile()
class MainState(TypedDict):
messages: Annotated[list, add_messages]
task: str
research_result: str
writing_result: str
def research_coordinator_node(state: MainState) -> dict:
result = research_app.invoke({
"messages": [],
"topic": state["task"],
"research_output": "",
})
return {"research_result": result["research_output"]}
def writing_node(state: MainState) -> dict:
messages = [
SystemMessage(content="あなたはライターエージェントです。リサーチ結果に基づいて記事を執筆してください。"),
HumanMessage(content=f"リサーチ結果に基づいて執筆:{state['research_result']}"),
]
response = llm.invoke(messages)
return {"writing_result": response.content}
main_graph = StateGraph(MainState)
main_graph.add_node("research_coordinator", research_coordinator_node)
main_graph.add_node("writer", writing_node)
main_graph.add_edge(START, "research_coordinator")
main_graph.add_edge("research_coordinator", "writer")
main_graph.add_edge("writer", END)
main_app = main_graph.compile()
2. 並列ノード実行
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ParallelState(TypedDict):
messages: Annotated[list, add_messages]
task: str
tech_analysis: str
market_analysis: str
competitor_analysis: str
final_report: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def tech_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="あなたは技術分析エージェントです。"),
HumanMessage(content=f"技術分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"tech_analysis": response.content}
def market_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="あなたは市場分析エージェントです。"),
HumanMessage(content=f"市場分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"market_analysis": response.content}
def competitor_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="あなたは競合分析エージェントです。"),
HumanMessage(content=f"競合分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"competitor_analysis": response.content}
def merge_node(state: ParallelState) -> dict:
combined = f"""# 総合分析レポート
## 技術分析
{state['tech_analysis']}
## 市場分析
{state['market_analysis']}
## 競合分析
{state['competitor_analysis']}
"""
return {"final_report": combined}
graph = StateGraph(ParallelState)
graph.add_node("tech", tech_analysis_node)
graph.add_node("market", market_analysis_node)
graph.add_node("competitor", competitor_analysis_node)
graph.add_node("merge", merge_node)
graph.add_edge(START, "tech")
graph.add_edge(START, "market")
graph.add_edge(START, "competitor")
graph.add_edge("tech", "merge")
graph.add_edge("market", "merge")
graph.add_edge("competitor", "merge")
graph.add_edge("merge", END)
app = graph.compile()
3. ツール呼び出し統合
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
@tool
def search_database(query: str) -> str:
"""データベースを検索して情報を取得"""
mock_results = {
"revenue": "2026年Q1売上:1.2億元、前年同期比35%増",
"users": "現在のアクティブユーザー:580万、月間成長12%",
"products": "製品ライン:3つのコア製品、12のSKU",
}
for key, value in mock_results.items():
if key in query.lower():
return value
return "関連データが見つかりません"
@tool
def calculate_metrics(expression: str) -> str:
"""ビジネスメトリクスを計算"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"計算結果:{result}"
except Exception as e:
return f"計算エラー:{e}"
tools = [search_database, calculate_metrics]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def agent_node(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition, {
"tools": "tools",
END: END,
})
graph.add_edge("tools", "agent")
app = graph.compile()
result = app.invoke({
"messages": [HumanMessage(content="Q1売上を照会し、前年同期比成長率を計算してください(昨年Q1は8900万と仮定)")],
})
for msg in result["messages"]:
if hasattr(msg, "content") and msg.content:
print(f"{msg.type}: {msg.content}")
比較分析
| 次元 | LangGraph | CrewAI | AutoGen | LangChain Agent | Dify |
|---|---|---|---|---|---|
| ワークフローオーケストレーション | ✅ 宣言的グラフ | ⚠️ プロセス定義 | ⚠️ 会話駆動 | ❌ リニアチェーン | ✅ ビジュアル |
| 状態管理 | ✅ 内蔵 | ⚠️ 手動 | ⚠️ 手動 | ❌ なし | ✅ 内蔵 |
| ヒューマンインザループ | ✅ interrupt | ❌ | ⚠️ 人間プロキシ | ❌ | ✅ ビジュアル |
| 永続化 | ✅ Checkpointer | ❌ | ❌ | ❌ | ✅ 内蔵 |
| 条件ルーティング | ✅ 条件エッジ | ⚠️ 限定 | ❌ | ❌ | ✅ ビジュアル |
| サブグラフ再利用 | ✅ Subgraph | ❌ | ❌ | ❌ | ⚠️ テンプレート |
| 並列実行 | ✅ | ❌ | ✅ | ❌ | ⚠️ |
| セルフホスト | ✅ | ✅ | ✅ | ✅ | ⚠️ Docker |
| 学習曲線 | 中 | 低 | 中 | 低 | 低 |
| 本番対応 | ✅ | ⚠️ | ⚠️ | ❌ | ✅ |
まとめ:LangGraphは「また一つのエージェントフレームワーク」ではなく、「AIワークフローのオペレーティングシステム」です。コアバリューはStateGraphにあります——命令的if-elseを宣言的グラフで置き換え、手動状態管理をCheckpointerで置き換え、ハードコードルーティングを条件エッジで置き換えます。2026年のマルチエージェント実践パス:まずシングルエージェント状態マシンでプロセスを通す→次にスーパーバイザーパターンでマルチエージェントをオーケストレーション→最後にヒューマンインザループと永続化を追加。鍵は「状態」を第一級市民として扱うこと——すべてのエージェント間通信はStateを通じ、すべてのフロー制御はグラフトポロジーを通じ、すべての中断/再開はCheckpointを通じる。
オンラインツール推奨
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- Hash計算:/ja/encode/hash
- JWTデコード:/ja/encode/jwt-decode
ブラウザローカルツールを無料で試す →
#Python#LangGraph#多Agent#AI工作流#状态机#2026#Agent编排