Python LangGraph多Agent协作:从状态机到工作流编排的5种实战模式

AI与大数据

你的AI Agent只会一问一答,复杂任务全靠人工拆解

用户说"帮我分析竞品并生成报告",你的Agent只能一步步追问;需要3个Agent协作完成"调研→分析→写作"流水线,你发现没有现成的编排框架;Agent执行到一半需要人工确认,你不知道怎么暂停和恢复。单Agent的时代已经过去了——2026年,LangGraph让多Agent协作从手工拼凑变成了声明式编排

本文将从LangGraph状态图基础出发,带你完成状态机→多Agent编排→条件路由→人机协作→持久化状态的5种实战模式,从开发到生产,一步不落。


LangGraph核心概念

概念 说明
StateGraph 状态图,定义工作流的节点和边
State 状态,在工作流节点间传递的共享数据结构
Node 节点,执行具体逻辑的函数,接收State返回更新
Edge 边,定义节点间的转移关系
Conditional Edge 条件边,根据State动态决定下一个节点
Checkpoint 检查点,持久化State,支持暂停/恢复
Interrupt 中断,暂停工作流等待外部输入(人机协作)
Tool Node 工具节点,封装外部工具调用
Subgraph 子图,将复杂工作流封装为可复用模块
Command 命令对象,支持节点间的状态更新和路由控制

工作流执行流程

1. 定义State(TypedDict或Pydantic Model)
2. 创建StateGraph(State)
3. 添加节点:graph.add_node("name", function)
4. 添加边:graph.add_edge("node_a", "node_b")
5. 添加条件边:graph.add_conditional_edges("node_a", router)
6. 设置入口:graph.set_entry_point("start")
7. 编译图:app = graph.compile(checkpointer=...)
8. 执行:app.invoke({"input": ...}, config={"configurable": {"thread_id": "..."}})

问题分析:多Agent协作的5大挑战

  1. 状态管理混乱:多Agent间共享状态,手动传递容易遗漏和冲突
  2. 流程编排复杂:条件分支、循环、并行执行,硬编码if-else难以维护
  3. 人机协作困难:Agent需要人工确认时,无法优雅地暂停和恢复
  4. 错误恢复脆弱:长工作流执行到一半失败,从头重试代价巨大
  5. 可观测性缺失:多Agent协作的执行过程像黑盒,调试困难

分步实操:5种实战模式

模式1:基础状态机——单Agent工作流

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


class ResearchState(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str
    research_notes: str
    summary: str


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def research_node(state: ResearchState) -> dict:
    messages = [
        SystemMessage(content="你是一个专业研究员。对给定主题进行深入研究,输出详细的研究笔记。"),
        HumanMessage(content=f"请对以下主题进行深入研究:{state['topic']}"),
    ]
    response = llm.invoke(messages)
    return {"research_notes": response.content}


def summarize_node(state: ResearchState) -> dict:
    messages = [
        SystemMessage(content="你是一个专业编辑。将研究笔记总结为简洁的摘要。"),
        HumanMessage(content=f"请总结以下研究笔记:\n\n{state['research_notes']}"),
    ]
    response = llm.invoke(messages)
    return {"summary": response.content}


def format_node(state: ResearchState) -> dict:
    formatted = f"""# 研究报告:{state['topic']}

## 研究笔记
{state['research_notes']}

## 摘要
{state['summary']}
"""
    return {"messages": [HumanMessage(content=formatted)]}


graph = StateGraph(ResearchState)

graph.add_node("research", research_node)
graph.add_node("summarize", summarize_node)
graph.add_node("format", format_node)

graph.add_edge(START, "research")
graph.add_edge("research", "summarize")
graph.add_edge("summarize", "format")
graph.add_edge("format", END)

app = graph.compile()

result = app.invoke({
    "messages": [],
    "topic": "2026年大语言模型技术趋势",
    "research_notes": "",
    "summary": "",
})

print(result["messages"][-1].content)

模式2:多Agent协作——Supervisor模式

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json


class CollaborationState(TypedDict):
    messages: Annotated[list, add_messages]
    task: str
    next_agent: str
    research_result: str
    analysis_result: str
    writing_result: str
    review_feedback: str
    iteration_count: int


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def supervisor_node(state: CollaborationState) -> dict:
    if state["iteration_count"] >= 3:
        return {"next_agent": "end"}

    if not state["research_result"]:
        return {"next_agent": "researcher"}

    if not state["analysis_result"]:
        return {"next_agent": "analyst"}

    if not state["writing_result"]:
        return {"next_agent": "writer"}

    if not state["review_feedback"]:
        return {"next_agent": "reviewer"}

    if "需要修改" in state["review_feedback"]:
        return {
            "next_agent": "writer",
            "writing_result": "",
            "review_feedback": "",
            "iteration_count": state["iteration_count"] + 1,
        }

    return {"next_agent": "end"}


def researcher_node(state: CollaborationState) -> dict:
    messages = [
        SystemMessage(content="你是研究员Agent。收集和整理与任务相关的信息。"),
        HumanMessage(content=f"研究任务:{state['task']}"),
    ]
    response = llm.invoke(messages)
    return {"research_result": response.content}


def analyst_node(state: CollaborationState) -> dict:
    messages = [
        SystemMessage(content="你是分析师Agent。基于研究结果进行深度分析。"),
        HumanMessage(content=f"基于以下研究结果进行分析:\n\n{state['research_result']}"),
    ]
    response = llm.invoke(messages)
    return {"analysis_result": response.content}


def writer_node(state: CollaborationState) -> dict:
    context = f"研究结果:{state['research_result']}\n\n分析结果:{state['analysis_result']}"
    if state["review_feedback"]:
        context += f"\n\n修改意见:{state['review_feedback']}"

    messages = [
        SystemMessage(content="你是写作Agent。基于研究和分析结果撰写高质量文章。"),
        HumanMessage(content=f"撰写文章:\n\n{context}"),
    ]
    response = llm.invoke(messages)
    return {"writing_result": response.content}


def reviewer_node(state: CollaborationState) -> dict:
    messages = [
        SystemMessage(content="你是审稿Agent。审查文章质量,如果需要修改请说明具体意见,如果满意请说'通过'。"),
        HumanMessage(content=f"审查以下文章:\n\n{state['writing_result']}"),
    ]
    response = llm.invoke(messages)
    return {"review_feedback": response.content}


def route_after_supervisor(state: CollaborationState) -> str:
    next_agent = state["next_agent"]
    if next_agent == "end":
        return END
    return next_agent


graph = StateGraph(CollaborationState)

graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("analyst", analyst_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_after_supervisor, {
    "researcher": "researcher",
    "analyst": "analyst",
    "writer": "writer",
    "reviewer": "reviewer",
    END: END,
})
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")

app = graph.compile()

result = app.invoke({
    "messages": [],
    "task": "分析2026年AI Agent技术趋势并撰写报告",
    "next_agent": "",
    "research_result": "",
    "analysis_result": "",
    "writing_result": "",
    "review_feedback": "",
    "iteration_count": 0,
})

print(result["writing_result"])

模式3:条件路由——动态工作流

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json


class SupportState(TypedDict):
    messages: Annotated[list, add_messages]
    user_input: str
    intent: str
    category: str
    response: str
    escalated: bool


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def classify_intent_node(state: SupportState) -> dict:
    messages = [
        SystemMessage(content="""你是客服意图分类器。分析用户输入,返回JSON格式:
{"intent": "technical|billing|general|complaint", "category": "具体分类", "escalated": false}

如果用户情绪激动或问题严重,设置escalated为true。"""),
        HumanMessage(content=state["user_input"]),
    ]
    response = llm.invoke(messages)
    try:
        result = json.loads(response.content)
        return {
            "intent": result.get("intent", "general"),
            "category": result.get("category", ""),
            "escalated": result.get("escalated", False),
        }
    except json.JSONDecodeError:
        return {"intent": "general", "category": "未分类", "escalated": False}


def technical_support_node(state: SupportState) -> dict:
    messages = [
        SystemMessage(content="你是技术支持Agent。提供专业的技术问题解决方案。"),
        HumanMessage(content=f"用户问题:{state['user_input']}\n分类:{state['category']}"),
    ]
    response = llm.invoke(messages)
    return {"response": response.content}


def billing_support_node(state: SupportState) -> dict:
    messages = [
        SystemMessage(content="你是账单支持Agent。处理账单相关问题,包括退款、费用查询等。"),
        HumanMessage(content=f"用户问题:{state['user_input']}\n分类:{state['category']}"),
    ]
    response = llm.invoke(messages)
    return {"response": response.content}


def general_support_node(state: SupportState) -> dict:
    messages = [
        SystemMessage(content="你是通用客服Agent。处理一般性咨询问题。"),
        HumanMessage(content=f"用户问题:{state['user_input']}"),
    ]
    response = llm.invoke(messages)
    return {"response": response.content}


def escalation_node(state: SupportState) -> dict:
    messages = [
        SystemMessage(content="你是高级客服Agent。处理需要升级的复杂或紧急问题。"),
        HumanMessage(content=f"紧急问题:{state['user_input']}\n分类:{state['category']}"),
    ]
    response = llm.invoke(messages)
    return {"response": response.content}


def route_by_intent(state: SupportState) -> str:
    if state["escalated"]:
        return "escalation"
    intent_map = {
        "technical": "technical",
        "billing": "billing",
        "general": "general",
        "complaint": "escalation",
    }
    return intent_map.get(state["intent"], "general")


graph = StateGraph(SupportState)

graph.add_node("classify", classify_intent_node)
graph.add_node("technical", technical_support_node)
graph.add_node("billing", billing_support_node)
graph.add_node("general", general_support_node)
graph.add_node("escalation", escalation_node)

graph.add_edge(START, "classify")
graph.add_conditional_edges("classify", route_by_intent, {
    "technical": "technical",
    "billing": "billing",
    "general": "general",
    "escalation": "escalation",
})
graph.add_edge("technical", END)
graph.add_edge("billing", END)
graph.add_edge("general", END)
graph.add_edge("escalation", END)

app = graph.compile()

result = app.invoke({
    "messages": [],
    "user_input": "我的服务器突然无法访问,数据库连接超时,非常紧急!",
    "intent": "",
    "category": "",
    "response": "",
    "escalated": False,
})

print(result["response"])

模式4:人机协作(Human-in-the-Loop)

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


class ApprovalState(TypedDict):
    messages: Annotated[list, add_messages]
    task: str
    draft: str
    human_feedback: str
    final_result: str
    approved: bool


llm = ChatOpenAI(model="gpt-4o", temperature=0)
checkpointer = MemorySaver()


def generate_draft_node(state: ApprovalState) -> dict:
    messages = [
        SystemMessage(content="你是内容创作Agent。根据任务要求生成草稿。"),
        HumanMessage(content=f"任务:{state['task']}"),
    ]
    response = llm.invoke(messages)
    return {"draft": response.content}


def human_review_node(state: ApprovalState) -> dict:
    return {}


def process_feedback_node(state: ApprovalState) -> dict:
    if state["approved"]:
        return {"final_result": state["draft"]}

    messages = [
        SystemMessage(content="你是内容修改Agent。根据反馈修改草稿。"),
        HumanMessage(content=f"原始草稿:{state['draft']}\n\n修改意见:{state['human_feedback']}"),
    ]
    response = llm.invoke(messages)
    return {"draft": response.content, "human_feedback": ""}


def should_continue(state: ApprovalState) -> str:
    if state["approved"]:
        return "end"
    return "revise"


graph = StateGraph(ApprovalState)

graph.add_node("generate_draft", generate_draft_node)
graph.add_node("human_review", human_review_node)
graph.add_node("process_feedback", process_feedback_node)

graph.add_edge(START, "generate_draft")
graph.add_edge("generate_draft", "human_review")
graph.add_conditional_edges("human_review", should_continue, {
    "revise": "process_feedback",
    "end": END,
})
graph.add_edge("process_feedback", "human_review")

app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"],
)

thread_id = "approval-001"
config = {"configurable": {"thread_id": thread_id}}

result = app.invoke({
    "messages": [],
    "task": "撰写2026年AI行业趋势报告",
    "draft": "",
    "human_feedback": "",
    "final_result": "",
    "approved": False,
}, config=config)

current_state = app.get_state(config)
print("草稿内容:", current_state.values.get("draft", ""))

app.update_state(config, {
    "human_feedback": "请增加关于多模态模型的内容",
    "approved": False,
})

app.invoke(None, config=config)

current_state = app.get_state(config)
print("修改后草稿:", current_state.values.get("draft", ""))

app.update_state(config, {"approved": True})
final_result = app.invoke(None, config=config)
print("最终结果:", final_result["final_result"])

模式5:持久化状态——PostgreSQL Checkpointer

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import asyncio
from psycopg_pool import AsyncConnectionPool


class LongRunningState(TypedDict):
    messages: Annotated[list, add_messages]
    task: str
    step1_result: str
    step2_result: str
    step3_result: str
    current_step: int
    error: str


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def step1_node(state: LongRunningState) -> dict:
    try:
        messages = [
            SystemMessage(content="你是数据处理Agent。执行第一步:数据收集和清洗。"),
            HumanMessage(content=f"处理任务:{state['task']}"),
        ]
        response = llm.invoke(messages)
        return {"step1_result": response.content, "current_step": 1, "error": ""}
    except Exception as e:
        return {"error": str(e), "current_step": state["current_step"]}


def step2_node(state: LongRunningState) -> dict:
    try:
        messages = [
            SystemMessage(content="你是分析Agent。执行第二步:数据分析和建模。"),
            HumanMessage(content=f"基于第一步结果分析:{state['step1_result']}"),
        ]
        response = llm.invoke(messages)
        return {"step2_result": response.content, "current_step": 2, "error": ""}
    except Exception as e:
        return {"error": str(e), "current_step": state["current_step"]}


def step3_node(state: LongRunningState) -> dict:
    try:
        messages = [
            SystemMessage(content="你是报告Agent。执行第三步:生成最终报告。"),
            HumanMessage(content=f"基于分析结果生成报告:{state['step2_result']}"),
        ]
        response = llm.invoke(messages)
        return {"step3_result": response.content, "current_step": 3, "error": ""}
    except Exception as e:
        return {"error": str(e), "current_step": state["current_step"]}


def route_after_error(state: LongRunningState) -> str:
    if state["error"]:
        return END
    if state["current_step"] == 0:
        return "step1"
    if state["current_step"] == 1:
        return "step2"
    if state["current_step"] == 2:
        return "step3"
    return END


graph = StateGraph(LongRunningState)

graph.add_node("step1", step1_node)
graph.add_node("step2", step2_node)
graph.add_node("step3", step3_node)

graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)


async def run_with_persistence():
    connection_string = "postgresql://user:pass@localhost:5432/langgraph"
    async with AsyncConnectionPool(connection_string) as pool:
        checkpointer = AsyncPostgresSaver(pool)
        await checkpointer.setup()

        app = graph.compile(checkpointer=checkpointer)

        thread_id = "long-running-task-001"
        config = {"configurable": {"thread_id": thread_id}}

        result = await app.ainvoke({
            "messages": [],
            "task": "分析Q1销售数据并生成预测报告",
            "step1_result": "",
            "step2_result": "",
            "step3_result": "",
            "current_step": 0,
            "error": "",
        }, config=config)

        state = await app.aget_state(config)
        print(f"当前步骤: {state.values['current_step']}")
        print(f"最终结果: {state.values.get('step3_result', '未完成')}")

        if state.values.get("error"):
            print(f"从步骤 {state.values['current_step']} 恢复...")
            result = await app.ainvoke(None, config=config)


asyncio.run(run_with_persistence())

避坑指南

坑1:State中直接修改可变对象

# ❌ 错误:直接修改state中的列表
def bad_node(state: MyState) -> dict:
    state["items"].append("new_item")  # 直接修改原state
    return state

# ✅ 正确:返回新的值,让LangGraph的reducer处理
def good_node(state: MyState) -> dict:
    return {"items": state["items"] + ["new_item"]}
# 或者使用Annotated + reducer
# items: Annotated[list, operator.add]

坑2:条件边返回了不存在的节点名

# ❌ 错误:路由函数返回了未注册的节点名
def bad_router(state: MyState) -> str:
    return "non_existent_node"

graph.add_conditional_edges("start", bad_router)

# ✅ 正确:路由函数只返回已注册的节点名,并在映射中列出所有可能
def good_router(state: MyState) -> str:
    if state["intent"] == "tech":
        return "technical"
    return "general"

graph.add_conditional_edges("start", good_router, {
    "technical": "technical",
    "general": "general",
})

坑3:忘记设置checkpointer导致无法恢复

# ❌ 错误:没有checkpointer,interrupt_before无法工作
app = graph.compile(interrupt_before=["human_review"])

# ✅ 正确:必须提供checkpointer
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["human_review"],
)

坑4:节点函数返回了不完整的状态更新

# ❌ 错误:节点返回了None或空dict,导致状态丢失
def bad_node(state: MyState) -> dict:
    result = do_something()
    # 忘记返回状态更新
    return {}

# ✅ 正确:节点必须返回需要更新的状态字段
def good_node(state: MyState) -> dict:
    result = do_something()
    return {"result": result, "status": "completed"}

坑5:在异步环境中使用同步checkpointer

# ❌ 错误:异步环境中使用同步MemorySaver
from langgraph.checkpoint.memory import MemorySaver
app = graph.compile(checkpointer=MemorySaver())
await app.ainvoke(input_data, config=config)

# ✅ 正确:异步环境使用异步checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
app = graph.compile(checkpointer=AsyncPostgresSaver(pool))
await app.ainvoke(input_data, config=config)

报错排查

序号 报错信息 原因 解决方法
1 KeyError: 'field_name' State中缺少必需字段 确保初始invoke包含所有TypedDict字段
2 ValueError: Node 'xxx' not found 条件边引用了未注册的节点 检查add_node和add_conditional_edges的节点名
3 GraphRecursionError 图中存在无限循环 添加循环计数器或终止条件
4 Missing checkpointer 使用interrupt但没有checkpointer 编译时传入checkpointer参数
5 InvalidStateUpdate 节点返回了State中不存在的字段 确保返回的key与TypedDict定义一致
6 asyncio.run() cannot be called from a running event loop 在Jupyter中调用asyncio.run 使用await或nest_asyncio
7 psycopg.OperationalError PostgreSQL连接失败 检查连接字符串、数据库是否运行
8 TypeError: 'NoneType' object is not subscriptable 节点返回None 确保节点函数返回dict
9 LangGraphError: Cannot resume without thread_id 恢复执行时缺少thread_id 在config中提供thread_id
10 RateLimitError from OpenAI API调用频率超限 添加重试逻辑或降低并发

进阶优化

1. 子图封装——可复用工作流模块

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


class ResearchSubState(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str
    research_output: str


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def deep_research_node(state: ResearchSubState) -> dict:
    messages = [
        SystemMessage(content="你是深度研究员。进行全面深入的研究。"),
        HumanMessage(content=f"深度研究:{state['topic']}"),
    ]
    response = llm.invoke(messages)
    return {"research_output": response.content}


def fact_check_node(state: ResearchSubState) -> dict:
    messages = [
        SystemMessage(content="你是事实核查员。验证研究结果的准确性。"),
        HumanMessage(content=f"核查以下内容:{state['research_output']}"),
    ]
    response = llm.invoke(messages)
    return {"research_output": f"{state['research_output']}\n\n事实核查:{response.content}"}


research_subgraph = StateGraph(ResearchSubState)
research_subgraph.add_node("deep_research", deep_research_node)
research_subgraph.add_node("fact_check", fact_check_node)
research_subgraph.add_edge(START, "deep_research")
research_subgraph.add_edge("deep_research", "fact_check")
research_subgraph.add_edge("fact_check", END)
research_app = research_subgraph.compile()


class MainState(TypedDict):
    messages: Annotated[list, add_messages]
    task: str
    research_result: str
    writing_result: str


def research_coordinator_node(state: MainState) -> dict:
    result = research_app.invoke({
        "messages": [],
        "topic": state["task"],
        "research_output": "",
    })
    return {"research_result": result["research_output"]}


def writing_node(state: MainState) -> dict:
    messages = [
        SystemMessage(content="你是写作Agent。基于研究结果撰写文章。"),
        HumanMessage(content=f"基于研究结果写作:{state['research_result']}"),
    ]
    response = llm.invoke(messages)
    return {"writing_result": response.content}


main_graph = StateGraph(MainState)
main_graph.add_node("research_coordinator", research_coordinator_node)
main_graph.add_node("writer", writing_node)
main_graph.add_edge(START, "research_coordinator")
main_graph.add_edge("research_coordinator", "writer")
main_graph.add_edge("writer", END)

main_app = main_graph.compile()

2. 并行节点执行

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


class ParallelState(TypedDict):
    messages: Annotated[list, add_messages]
    task: str
    tech_analysis: str
    market_analysis: str
    competitor_analysis: str
    final_report: str


llm = ChatOpenAI(model="gpt-4o", temperature=0)


def tech_analysis_node(state: ParallelState) -> dict:
    messages = [
        SystemMessage(content="你是技术分析Agent。"),
        HumanMessage(content=f"技术分析:{state['task']}"),
    ]
    response = llm.invoke(messages)
    return {"tech_analysis": response.content}


def market_analysis_node(state: ParallelState) -> dict:
    messages = [
        SystemMessage(content="你是市场分析Agent。"),
        HumanMessage(content=f"市场分析:{state['task']}"),
    ]
    response = llm.invoke(messages)
    return {"market_analysis": response.content}


def competitor_analysis_node(state: ParallelState) -> dict:
    messages = [
        SystemMessage(content="你是竞品分析Agent。"),
        HumanMessage(content=f"竞品分析:{state['task']}"),
    ]
    response = llm.invoke(messages)
    return {"competitor_analysis": response.content}


def merge_node(state: ParallelState) -> dict:
    combined = f"""# 综合分析报告

## 技术分析
{state['tech_analysis']}

## 市场分析
{state['market_analysis']}

## 竞品分析
{state['competitor_analysis']}
"""
    return {"final_report": combined}


graph = StateGraph(ParallelState)

graph.add_node("tech", tech_analysis_node)
graph.add_node("market", market_analysis_node)
graph.add_node("competitor", competitor_analysis_node)
graph.add_node("merge", merge_node)

graph.add_edge(START, "tech")
graph.add_edge(START, "market")
graph.add_edge(START, "competitor")
graph.add_edge("tech", "merge")
graph.add_edge("market", "merge")
graph.add_edge("competitor", "merge")
graph.add_edge("merge", END)

app = graph.compile()

3. 工具调用集成

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool


class AgentState(TypedDict):
    messages: Annotated[list, add_messages]


@tool
def search_database(query: str) -> str:
    """搜索数据库获取信息"""
    mock_results = {
        "revenue": "2026年Q1营收:1.2亿元,同比增长35%",
        "users": "当前活跃用户:580万,月增长12%",
        "products": "产品线:3条核心产品,12个SKU",
    }
    for key, value in mock_results.items():
        if key in query.lower():
            return value
    return "未找到相关数据"


@tool
def calculate_metrics(expression: str) -> str:
    """计算业务指标"""
    try:
        result = eval(expression, {"__builtins__": {}}, {})
        return f"计算结果:{result}"
    except Exception as e:
        return f"计算错误:{e}"


tools = [search_database, calculate_metrics]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)


def agent_node(state: AgentState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}


graph = StateGraph(AgentState)

graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))

graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition, {
    "tools": "tools",
    END: END,
})
graph.add_edge("tools", "agent")

app = graph.compile()

result = app.invoke({
    "messages": [HumanMessage(content="查询Q1营收并计算同比增长率(假设去年Q1为8900万)")],
})

for msg in result["messages"]:
    if hasattr(msg, "content") and msg.content:
        print(f"{msg.type}: {msg.content}")

对比分析

维度 LangGraph CrewAI AutoGen LangChain Agent Dify
工作流编排 ✅ 声明式图 ⚠️ 流程定义 ⚠️ 对话驱动 ❌ 线性链 ✅ 可视化
状态管理 ✅ 内置 ⚠️ 手动 ⚠️ 手动 ❌ 无 ✅ 内置
人机协作 ✅ interrupt ⚠️ 人工代理 ✅ 可视化
持久化 ✅ Checkpointer ✅ 内置
条件路由 ✅ 条件边 ⚠️ 有限 ✅ 可视化
子图复用 ✅ Subgraph ⚠️ 模板
并行执行 ⚠️
自部署 ⚠️ Docker
学习曲线
生产就绪 ⚠️ ⚠️

总结:LangGraph不是"又一个Agent框架",而是"AI工作流的操作系统"。它的核心价值在于StateGraph——用声明式图替代命令式if-else,用Checkpointer替代手工状态管理,用条件边替代硬编码路由。2026年的多Agent实践路径:先用单Agent状态机跑通流程→再用Supervisor模式编排多Agent→最后加人机协作和持久化。关键是要把"状态"作为一等公民——所有Agent间的通信都通过State,所有流程控制都通过图的拓扑,所有中断恢复都通过Checkpoint。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#Python#LangGraph#多Agent#AI工作流#状态机#2026#Agent编排