Python LangGraph多Agent協作:從狀態機到工作流編排的5種實戰模式
AI与大数据
你的AI Agent只會一問一答,複雜任務全靠人工拆解
使用者說「幫我分析競品並產生報告」,你的Agent只能一步步追問;需要3個Agent協作完成「調研→分析→寫作」流水線,你發現沒有現成的編排框架;Agent執行到一半需要人工確認,你不知道怎麼暫停和恢復。單Agent的時代已經過去了——2026年,LangGraph讓多Agent協作從手工拼湊變成了宣告式編排。
本文將從LangGraph狀態圖基礎出發,帶你完成狀態機→多Agent編排→條件路由→人機協作→持久化狀態的5種實戰模式,從開發到生產,一步不落。
LangGraph核心概念
| 概念 | 說明 |
|---|---|
| StateGraph | 狀態圖,定義工作流的節點和邊 |
| State | 狀態,在工作流節點間傳遞的共享資料結構 |
| Node | 節點,執行具體邏輯的函式,接收State回傳更新 |
| Edge | 邊,定義節點間的轉移關係 |
| Conditional Edge | 條件邊,根據State動態決定下一個節點 |
| Checkpoint | 檢查點,持久化State,支援暫停/恢復 |
| Interrupt | 中斷,暫停工作流等待外部輸入(人機協作) |
| Tool Node | 工具節點,封裝外部工具呼叫 |
| Subgraph | 子圖,將複雜工作流封裝為可複用模組 |
| Command | 命令物件,支援節點間的狀態更新和路由控制 |
工作流執行流程
1. 定義State(TypedDict或Pydantic Model)
2. 建立StateGraph(State)
3. 新增節點:graph.add_node("name", function)
4. 新增邊:graph.add_edge("node_a", "node_b")
5. 新增條件邊:graph.add_conditional_edges("node_a", router)
6. 設定入口:graph.set_entry_point("start")
7. 編譯圖:app = graph.compile(checkpointer=...)
8. 執行:app.invoke({"input": ...}, config={"configurable": {"thread_id": "..."}})
問題分析:多Agent協作的5大挑戰
- 狀態管理混亂:多Agent間共享狀態,手動傳遞容易遺漏和衝突
- 流程編排複雜:條件分支、迴圈、平行執行,硬編碼if-else難以維護
- 人機協作困難:Agent需要人工確認時,無法優雅地暫停和恢復
- 錯誤恢復脆弱:長工作流執行到一半失敗,從頭重試代價巨大
- 可觀測性缺失:多Agent協作的執行過程像黑盒,除錯困難
分步實操:5種實戰模式
模式1:基礎狀態機——單Agent工作流
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
research_notes: str
summary: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def research_node(state: ResearchState) -> dict:
messages = [
SystemMessage(content="你是專業研究員。對給定主題進行深入研究,輸出詳細的研究筆記。"),
HumanMessage(content=f"請對以下主題進行深入研究:{state['topic']}"),
]
response = llm.invoke(messages)
return {"research_notes": response.content}
def summarize_node(state: ResearchState) -> dict:
messages = [
SystemMessage(content="你是專業編輯。將研究筆記總結為簡潔的摘要。"),
HumanMessage(content=f"請總結以下研究筆記:\n\n{state['research_notes']}"),
]
response = llm.invoke(messages)
return {"summary": response.content}
def format_node(state: ResearchState) -> dict:
formatted = f"""# 研究報告:{state['topic']}
## 研究筆記
{state['research_notes']}
## 摘要
{state['summary']}
"""
return {"messages": [HumanMessage(content=formatted)]}
graph = StateGraph(ResearchState)
graph.add_node("research", research_node)
graph.add_node("summarize", summarize_node)
graph.add_node("format", format_node)
graph.add_edge(START, "research")
graph.add_edge("research", "summarize")
graph.add_edge("summarize", "format")
graph.add_edge("format", END)
app = graph.compile()
result = app.invoke({
"messages": [],
"topic": "2026年大語言模型技術趨勢",
"research_notes": "",
"summary": "",
})
print(result["messages"][-1].content)
模式2:多Agent協作——Supervisor模式
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class CollaborationState(TypedDict):
messages: Annotated[list, add_messages]
task: str
next_agent: str
research_result: str
analysis_result: str
writing_result: str
review_feedback: str
iteration_count: int
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def supervisor_node(state: CollaborationState) -> dict:
if state["iteration_count"] >= 3:
return {"next_agent": "end"}
if not state["research_result"]:
return {"next_agent": "researcher"}
if not state["analysis_result"]:
return {"next_agent": "analyst"}
if not state["writing_result"]:
return {"next_agent": "writer"}
if not state["review_feedback"]:
return {"next_agent": "reviewer"}
if "需要修改" in state["review_feedback"]:
return {
"next_agent": "writer",
"writing_result": "",
"review_feedback": "",
"iteration_count": state["iteration_count"] + 1,
}
return {"next_agent": "end"}
def researcher_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="你是研究員Agent。收集和整理與任務相關的資訊。"),
HumanMessage(content=f"研究任務:{state['task']}"),
]
response = llm.invoke(messages)
return {"research_result": response.content}
def analyst_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="你是分析師Agent。基於研究結果進行深度分析。"),
HumanMessage(content=f"基於以下研究結果進行分析:\n\n{state['research_result']}"),
]
response = llm.invoke(messages)
return {"analysis_result": response.content}
def writer_node(state: CollaborationState) -> dict:
context = f"研究結果:{state['research_result']}\n\n分析結果:{state['analysis_result']}"
if state["review_feedback"]:
context += f"\n\n修改意見:{state['review_feedback']}"
messages = [
SystemMessage(content="你是寫作Agent。基於研究和分析結果撰寫高品質文章。"),
HumanMessage(content=f"撰寫文章:\n\n{context}"),
]
response = llm.invoke(messages)
return {"writing_result": response.content}
def reviewer_node(state: CollaborationState) -> dict:
messages = [
SystemMessage(content="你是審稿Agent。審查文章品質,如果需要修改請說明具體意見,如果滿意請說「通過」。"),
HumanMessage(content=f"審查以下文章:\n\n{state['writing_result']}"),
]
response = llm.invoke(messages)
return {"review_feedback": response.content}
def route_after_supervisor(state: CollaborationState) -> str:
next_agent = state["next_agent"]
if next_agent == "end":
return END
return next_agent
graph = StateGraph(CollaborationState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("analyst", analyst_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_after_supervisor, {
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"reviewer": "reviewer",
END: END,
})
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()
result = app.invoke({
"messages": [],
"task": "分析2026年AI Agent技術趨勢並撰寫報告",
"next_agent": "",
"research_result": "",
"analysis_result": "",
"writing_result": "",
"review_feedback": "",
"iteration_count": 0,
})
print(result["writing_result"])
模式3:條件路由——動態工作流
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
class SupportState(TypedDict):
messages: Annotated[list, add_messages]
user_input: str
intent: str
category: str
response: str
escalated: bool
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def classify_intent_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="""你是客服意圖分類器。分析使用者輸入,回傳JSON格式:
{"intent": "technical|billing|general|complaint", "category": "具體分類", "escalated": false}
如果使用者情緒激動或問題嚴重,設定escalated為true。"""),
HumanMessage(content=state["user_input"]),
]
response = llm.invoke(messages)
try:
result = json.loads(response.content)
return {
"intent": result.get("intent", "general"),
"category": result.get("category", ""),
"escalated": result.get("escalated", False),
}
except json.JSONDecodeError:
return {"intent": "general", "category": "未分類", "escalated": False}
def technical_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="你是技術支援Agent。提供專業的技術問題解決方案。"),
HumanMessage(content=f"使用者問題:{state['user_input']}\n分類:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def billing_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="你是帳單支援Agent。處理帳單相關問題,包括退款、費用查詢等。"),
HumanMessage(content=f"使用者問題:{state['user_input']}\n分類:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def general_support_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="你是通用客服Agent。處理一般性諮詢問題。"),
HumanMessage(content=f"使用者問題:{state['user_input']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def escalation_node(state: SupportState) -> dict:
messages = [
SystemMessage(content="你是進階客服Agent。處理需要升級的複雜或緊急問題。"),
HumanMessage(content=f"緊急問題:{state['user_input']}\n分類:{state['category']}"),
]
response = llm.invoke(messages)
return {"response": response.content}
def route_by_intent(state: SupportState) -> str:
if state["escalated"]:
return "escalation"
intent_map = {
"technical": "technical",
"billing": "billing",
"general": "general",
"complaint": "escalation",
}
return intent_map.get(state["intent"], "general")
graph = StateGraph(SupportState)
graph.add_node("classify", classify_intent_node)
graph.add_node("technical", technical_support_node)
graph.add_node("billing", billing_support_node)
graph.add_node("general", general_support_node)
graph.add_node("escalation", escalation_node)
graph.add_edge(START, "classify")
graph.add_conditional_edges("classify", route_by_intent, {
"technical": "technical",
"billing": "billing",
"general": "general",
"escalation": "escalation",
})
graph.add_edge("technical", END)
graph.add_edge("billing", END)
graph.add_edge("general", END)
graph.add_edge("escalation", END)
app = graph.compile()
result = app.invoke({
"messages": [],
"user_input": "我的伺服器突然無法存取,資料庫連線逾時,非常緊急!",
"intent": "",
"category": "",
"response": "",
"escalated": False,
})
print(result["response"])
模式4:人機協作(Human-in-the-Loop)
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ApprovalState(TypedDict):
messages: Annotated[list, add_messages]
task: str
draft: str
human_feedback: str
final_result: str
approved: bool
llm = ChatOpenAI(model="gpt-4o", temperature=0)
checkpointer = MemorySaver()
def generate_draft_node(state: ApprovalState) -> dict:
messages = [
SystemMessage(content="你是內容創作Agent。根據任務要求產生草稿。"),
HumanMessage(content=f"任務:{state['task']}"),
]
response = llm.invoke(messages)
return {"draft": response.content}
def human_review_node(state: ApprovalState) -> dict:
return {}
def process_feedback_node(state: ApprovalState) -> dict:
if state["approved"]:
return {"final_result": state["draft"]}
messages = [
SystemMessage(content="你是內容修改Agent。根據回饋修改草稿。"),
HumanMessage(content=f"原始草稿:{state['draft']}\n\n修改意見:{state['human_feedback']}"),
]
response = llm.invoke(messages)
return {"draft": response.content, "human_feedback": ""}
def should_continue(state: ApprovalState) -> str:
if state["approved"]:
return "end"
return "revise"
graph = StateGraph(ApprovalState)
graph.add_node("generate_draft", generate_draft_node)
graph.add_node("human_review", human_review_node)
graph.add_node("process_feedback", process_feedback_node)
graph.add_edge(START, "generate_draft")
graph.add_edge("generate_draft", "human_review")
graph.add_conditional_edges("human_review", should_continue, {
"revise": "process_feedback",
"end": END,
})
graph.add_edge("process_feedback", "human_review")
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"],
)
thread_id = "approval-001"
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke({
"messages": [],
"task": "撰寫2026年AI行業趨勢報告",
"draft": "",
"human_feedback": "",
"final_result": "",
"approved": False,
}, config=config)
current_state = app.get_state(config)
print("草稿內容:", current_state.values.get("draft", ""))
app.update_state(config, {
"human_feedback": "請增加關於多模態模型的內容",
"approved": False,
})
app.invoke(None, config=config)
current_state = app.get_state(config)
print("修改後草稿:", current_state.values.get("draft", ""))
app.update_state(config, {"approved": True})
final_result = app.invoke(None, config=config)
print("最終結果:", final_result["final_result"])
模式5:持久化狀態——PostgreSQL Checkpointer
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import asyncio
from psycopg_pool import AsyncConnectionPool
class LongRunningState(TypedDict):
messages: Annotated[list, add_messages]
task: str
step1_result: str
step2_result: str
step3_result: str
current_step: int
error: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def step1_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="你是資料處理Agent。執行第一步:資料收集和清洗。"),
HumanMessage(content=f"處理任務:{state['task']}"),
]
response = llm.invoke(messages)
return {"step1_result": response.content, "current_step": 1, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
def step2_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="你是分析Agent。執行第二步:資料分析和建模。"),
HumanMessage(content=f"基於第一步結果分析:{state['step1_result']}"),
]
response = llm.invoke(messages)
return {"step2_result": response.content, "current_step": 2, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
def step3_node(state: LongRunningState) -> dict:
try:
messages = [
SystemMessage(content="你是報告Agent。執行第三步:產生最終報告。"),
HumanMessage(content=f"基於分析結果產生報告:{state['step2_result']}"),
]
response = llm.invoke(messages)
return {"step3_result": response.content, "current_step": 3, "error": ""}
except Exception as e:
return {"error": str(e), "current_step": state["current_step"]}
graph = StateGraph(LongRunningState)
graph.add_node("step1", step1_node)
graph.add_node("step2", step2_node)
graph.add_node("step3", step3_node)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
async def run_with_persistence():
connection_string = "postgresql://user:pass@localhost:5432/langgraph"
async with AsyncConnectionPool(connection_string) as pool:
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
thread_id = "long-running-task-001"
config = {"configurable": {"thread_id": thread_id}}
result = await app.ainvoke({
"messages": [],
"task": "分析Q1銷售資料並產生預測報告",
"step1_result": "",
"step2_result": "",
"step3_result": "",
"current_step": 0,
"error": "",
}, config=config)
state = await app.aget_state(config)
print(f"當前步驟: {state.values['current_step']}")
print(f"最終結果: {state.values.get('step3_result', '未完成')}")
if state.values.get("error"):
print(f"從步驟 {state.values['current_step']} 恢復...")
result = await app.ainvoke(None, config=config)
asyncio.run(run_with_persistence())
避坑指南
坑1:State中直接修改可變物件
# ❌ 錯誤:直接修改state中的清單
def bad_node(state: MyState) -> dict:
state["items"].append("new_item") # 直接修改原state
return state
# ✅ 正確:回傳新的值,讓LangGraph的reducer處理
def good_node(state: MyState) -> dict:
return {"items": state["items"] + ["new_item"]}
# 或者使用Annotated + reducer
# items: Annotated[list, operator.add]
坑2:條件邊回傳了不存在的節點名
# ❌ 錯誤:路由函式回傳了未註冊的節點名
def bad_router(state: MyState) -> str:
return "non_existent_node"
graph.add_conditional_edges("start", bad_router)
# ✅ 正確:路由函式只回傳已註冊的節點名,並在映射中列出所有可能
def good_router(state: MyState) -> str:
if state["intent"] == "tech":
return "technical"
return "general"
graph.add_conditional_edges("start", good_router, {
"technical": "technical",
"general": "general",
})
坑3:忘記設定checkpointer導致無法恢復
# ❌ 錯誤:沒有checkpointer,interrupt_before無法運作
app = graph.compile(interrupt_before=["human_review"])
# ✅ 正確:必須提供checkpointer
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["human_review"],
)
坑4:節點函式回傳了不完整的狀態更新
# ❌ 錯誤:節點回傳了None或空dict,導致狀態遺失
def bad_node(state: MyState) -> dict:
result = do_something()
# 忘記回傳狀態更新
return {}
# ✅ 正確:節點必須回傳需要更新的狀態欄位
def good_node(state: MyState) -> dict:
result = do_something()
return {"result": result, "status": "completed"}
坑5:在非同步環境中使用同步checkpointer
# ❌ 錯誤:非同步環境中使用同步MemorySaver
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(checkpointer=MemorySaver())
await app.ainvoke(input_data, config=config)
# ✅ 正確:非同步環境使用非同步checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
app = graph.compile(checkpointer=AsyncPostgresSaver(pool))
await app.ainvoke(input_data, config=config)
報錯排查
| 序號 | 報錯資訊 | 原因 | 解決方法 |
|---|---|---|---|
| 1 | KeyError: 'field_name' |
State中缺少必要欄位 | 確保初始invoke包含所有TypedDict欄位 |
| 2 | ValueError: Node 'xxx' not found |
條件邊引用了未註冊的節點 | 檢查add_node和add_conditional_edges的節點名 |
| 3 | GraphRecursionError |
圖中存在無限迴圈 | 新增迴圈計數器或終止條件 |
| 4 | Missing checkpointer |
使用interrupt但沒有checkpointer | 編譯時傳入checkpointer引數 |
| 5 | InvalidStateUpdate |
節點回傳了State中不存在的欄位 | 確保回傳的key與TypedDict定義一致 |
| 6 | asyncio.run() cannot be called from a running event loop |
在Jupyter中呼叫asyncio.run | 使用await或nest_asyncio |
| 7 | psycopg.OperationalError |
PostgreSQL連線失敗 | 檢查連線字串、資料庫是否執行 |
| 8 | TypeError: 'NoneType' object is not subscriptable |
節點回傳None | 確保節點函式回傳dict |
| 9 | LangGraphError: Cannot resume without thread_id |
恢復執行時缺少thread_id | 在config中提供thread_id |
| 10 | RateLimitError from OpenAI |
API呼叫頻率超限 | 新增重試邏輯或降低並行 |
進階最佳化
1. 子圖封裝——可複用工作流模組
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ResearchSubState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
research_output: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def deep_research_node(state: ResearchSubState) -> dict:
messages = [
SystemMessage(content="你是深度研究員。進行全面深入的研究。"),
HumanMessage(content=f"深度研究:{state['topic']}"),
]
response = llm.invoke(messages)
return {"research_output": response.content}
def fact_check_node(state: ResearchSubState) -> dict:
messages = [
SystemMessage(content="你是事實核查員。驗證研究結果的準確性。"),
HumanMessage(content=f"核查以下內容:{state['research_output']}"),
]
response = llm.invoke(messages)
return {"research_output": f"{state['research_output']}\n\n事實核查:{response.content}"}
research_subgraph = StateGraph(ResearchSubState)
research_subgraph.add_node("deep_research", deep_research_node)
research_subgraph.add_node("fact_check", fact_check_node)
research_subgraph.add_edge(START, "deep_research")
research_subgraph.add_edge("deep_research", "fact_check")
research_subgraph.add_edge("fact_check", END)
research_app = research_subgraph.compile()
class MainState(TypedDict):
messages: Annotated[list, add_messages]
task: str
research_result: str
writing_result: str
def research_coordinator_node(state: MainState) -> dict:
result = research_app.invoke({
"messages": [],
"topic": state["task"],
"research_output": "",
})
return {"research_result": result["research_output"]}
def writing_node(state: MainState) -> dict:
messages = [
SystemMessage(content="你是寫作Agent。基於研究結果撰寫文章。"),
HumanMessage(content=f"基於研究結果寫作:{state['research_result']}"),
]
response = llm.invoke(messages)
return {"writing_result": response.content}
main_graph = StateGraph(MainState)
main_graph.add_node("research_coordinator", research_coordinator_node)
main_graph.add_node("writer", writing_node)
main_graph.add_edge(START, "research_coordinator")
main_graph.add_edge("research_coordinator", "writer")
main_graph.add_edge("writer", END)
main_app = main_graph.compile()
2. 平行節點執行
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class ParallelState(TypedDict):
messages: Annotated[list, add_messages]
task: str
tech_analysis: str
market_analysis: str
competitor_analysis: str
final_report: str
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def tech_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="你是技術分析Agent。"),
HumanMessage(content=f"技術分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"tech_analysis": response.content}
def market_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="你是市場分析Agent。"),
HumanMessage(content=f"市場分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"market_analysis": response.content}
def competitor_analysis_node(state: ParallelState) -> dict:
messages = [
SystemMessage(content="你是競品分析Agent。"),
HumanMessage(content=f"競品分析:{state['task']}"),
]
response = llm.invoke(messages)
return {"competitor_analysis": response.content}
def merge_node(state: ParallelState) -> dict:
combined = f"""# 綜合分析報告
## 技術分析
{state['tech_analysis']}
## 市場分析
{state['market_analysis']}
## 競品分析
{state['competitor_analysis']}
"""
return {"final_report": combined}
graph = StateGraph(ParallelState)
graph.add_node("tech", tech_analysis_node)
graph.add_node("market", market_analysis_node)
graph.add_node("competitor", competitor_analysis_node)
graph.add_node("merge", merge_node)
graph.add_edge(START, "tech")
graph.add_edge(START, "market")
graph.add_edge(START, "competitor")
graph.add_edge("tech", "merge")
graph.add_edge("market", "merge")
graph.add_edge("competitor", "merge")
graph.add_edge("merge", END)
app = graph.compile()
3. 工具呼叫整合
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
@tool
def search_database(query: str) -> str:
"""搜尋資料庫取得資訊"""
mock_results = {
"revenue": "2026年Q1營收:1.2億元,同比增長35%",
"users": "當前活躍使用者:580萬,月增長12%",
"products": "產品線:3條核心產品,12個SKU",
}
for key, value in mock_results.items():
if key in query.lower():
return value
return "未找到相關資料"
@tool
def calculate_metrics(expression: str) -> str:
"""計算業務指標"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"計算結果:{result}"
except Exception as e:
return f"計算錯誤:{e}"
tools = [search_database, calculate_metrics]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def agent_node(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition, {
"tools": "tools",
END: END,
})
graph.add_edge("tools", "agent")
app = graph.compile()
result = app.invoke({
"messages": [HumanMessage(content="查詢Q1營收並計算同比增長率(假設去年Q1為8900萬)")],
})
for msg in result["messages"]:
if hasattr(msg, "content") and msg.content:
print(f"{msg.type}: {msg.content}")
對比分析
| 維度 | LangGraph | CrewAI | AutoGen | LangChain Agent | Dify |
|---|---|---|---|---|---|
| 工作流編排 | ✅ 宣告式圖 | ⚠️ 流程定義 | ⚠️ 對話驅動 | ❌ 線性鏈 | ✅ 視覺化 |
| 狀態管理 | ✅ 內建 | ⚠️ 手動 | ⚠️ 手動 | ❌ 無 | ✅ 內建 |
| 人機協作 | ✅ interrupt | ❌ | ⚠️ 人工代理 | ❌ | ✅ 視覺化 |
| 持久化 | ✅ Checkpointer | ❌ | ❌ | ❌ | ✅ 內建 |
| 條件路由 | ✅ 條件邊 | ⚠️ 有限 | ❌ | ❌ | ✅ 視覺化 |
| 子圖複用 | ✅ Subgraph | ❌ | ❌ | ❌ | ⚠️ 範本 |
| 平行執行 | ✅ | ❌ | ✅ | ❌ | ⚠️ |
| 自部署 | ✅ | ✅ | ✅ | ✅ | ⚠️ Docker |
| 學習曲線 | 中 | 低 | 中 | 低 | 低 |
| 生產就緒 | ✅ | ⚠️ | ⚠️ | ❌ | ✅ |
總結:LangGraph不是「又一個Agent框架」,而是「AI工作流的作業系統」。它的核心價值在於StateGraph——用宣告式圖替代命令式if-else,用Checkpointer替代手工狀態管理,用條件邊替代硬編碼路由。2026年的多Agent實踐路徑:先用單Agent狀態機跑通流程→再用Supervisor模式編排多Agent→最後加人機協作和持久化。關鍵是要把「狀態」作為一等公民——所有Agent間的通訊都透過State,所有流程控制都透過圖的拓撲,所有中斷恢復都透過Checkpoint。
線上工具推薦
- JSON格式化:/zh-TW/json/format
- Base64編解碼:/zh-TW/encode/base64
- Hash計算:/zh-TW/encode/hash
- JWT解碼:/zh-TW/encode/jwt-decode
本站提供瀏覽器本地工具,免註冊即可試用 →
#Python#LangGraph#多Agent#AI工作流#状态机#2026#Agent编排