AI Agent工作流引擎从零构建:用Java打造生产级智能体编排系统
技术架构
为什么需要自建AI Agent工作流引擎?
2026年,AI Agent已经从"玩具"走向"生产",但现有框架的局限性越来越明显:
| 框架 | 语言 | 工作流模式 | DAG支持 | 状态持久化 | 企业级就绪 |
|---|---|---|---|---|---|
| LangChain/LangGraph | Python | 状态机 | ✅ | ❌(需自建) | ⚠️ |
| CrewAI | Python | 顺序/层级 | ❌ | ❌ | ❌ |
| AutoGen | Python | 对话式 | ❌ | ❌ | ❌ |
| Spring AI | Java | 线性链 | ❌ | ❌ | ⚠️ |
| Dify | Python/Go | 可视化DAG | ✅ | ✅ | ✅ |
核心痛点:
- LangChain生态锁定Python — Java企业项目无法直接复用,跨语言调用带来序列化开销和调试地狱
- Spring AI仅支持线性链 —
ChatClient → Tool → Advisor的链式调用无法表达复杂分支和并行逻辑 - 状态管理缺失 — Agent执行到一半崩溃,无法从断点恢复
- Multi-Agent协作原始 — 没有内置的Sequential/Parallel/Router/Debate等协作模式
真实案例:某金融公司用LangGraph做风控Agent,因Python GIL限制导致并发瓶颈,单机QPS仅120;迁移到Java自建引擎后QPS提升至1800。
Agent工作流引擎架构设计
五层架构总览
┌─────────────────────────────────────────────────────────────────┐
│ API 层(Layer 1) │
│ REST Controller │ GraphQL │ WebSocket(SSE流式响应) │
├─────────────────────────────────────────────────────────────────┤
│ 编排层(Layer 2) │
│ WorkflowEngine │ DAGScheduler │ ConditionRouter │ LoopHandler │
├─────────────────────────────────────────────────────────────────┤
│ 节点层(Layer 3) │
│ LLMNode │ ToolNode │ TransformNode │ SubWorkflowNode │
│ HumanApprovalNode │ ParallelGroupNode │ RouterNode │
├─────────────────────────────────────────────────────────────────┤
│ 状态管理层(Layer 4) │
│ StateMachine(Spring Statemachine)│ CheckpointManager │
│ Redis持久化 │ 执行上下文(WorkflowContext) │
├─────────────────────────────────────────────────────────────────┤
│ 基础设施层(Layer 5) │
│ ToolRegistry │ ModelRegistry │ EventBus │ AuditLogger │
│ MetricsCollector │ RetryPolicy │ CircuitBreaker │
└─────────────────────────────────────────────────────────────────┘
核心领域模型
public class WorkflowDefinition {
private String workflowId;
private String name;
private String version;
private List<NodeDefinition> nodes;
private List<EdgeDefinition> edges;
private Map<String, Object> metadata;
private WorkflowConfig config;
}
public class NodeDefinition {
private String nodeId;
private String type;
private Map<String, Object> properties;
private RetryPolicy retryPolicy;
private TimeoutConfig timeout;
}
public class EdgeDefinition {
private String sourceId;
private String targetId;
private String condition;
private EdgeType type;
}
public enum EdgeType {
NORMAL, CONDITIONAL, PARALLEL, LOOP
}
工作流执行上下文
public class WorkflowContext {
private String executionId;
private String workflowId;
private Map<String, Object> variables;
private Map<String, NodeResult> nodeResults;
private WorkflowState currentState;
private Instant startTime;
private String checkpointKey;
public <T> T getVariable(String key, Class<T> type) {
Object value = variables.get(key);
return type.cast(value);
}
public void setVariable(String key, Object value) {
variables.put(key, value);
}
public NodeResult getNodeResult(String nodeId) {
return nodeResults.get(nodeId);
}
}
DAG任务调度器核心实现
DAG(有向无环图)是工作流引擎的调度核心。我们需要解决三个关键问题:环检测、拓扑排序、并行执行无依赖节点。
环检测算法
@Component
public class CycleDetector {
public boolean hasCycle(List<NodeDefinition> nodes, List<EdgeDefinition> edges) {
Map<String, Set<String>> adjacency = buildAdjacencyMap(nodes, edges);
Set<String> visited = new HashSet<>();
Set<String> recursionStack = new HashSet<>();
for (NodeDefinition node : nodes) {
if (dfsCycleCheck(node.getNodeId(), adjacency, visited, recursionStack)) {
return true;
}
}
return false;
}
private boolean dfsCycleCheck(String nodeId,
Map<String, Set<String>> adjacency,
Set<String> visited,
Set<String> recursionStack) {
visited.add(nodeId);
recursionStack.add(nodeId);
for (String neighbor : adjacency.getOrDefault(nodeId, Set.of())) {
if (!visited.contains(neighbor)) {
if (dfsCycleCheck(neighbor, adjacency, visited, recursionStack)) {
return true;
}
} else if (recursionStack.contains(neighbor)) {
return true;
}
}
recursionStack.remove(nodeId);
return false;
}
private Map<String, Set<String>> buildAdjacencyMap(
List<NodeDefinition> nodes, List<EdgeDefinition> edges) {
Map<String, Set<String>> adjacency = new HashMap<>();
for (NodeDefinition node : nodes) {
adjacency.put(node.getNodeId(), new HashSet<>());
}
for (EdgeDefinition edge : edges) {
adjacency.get(edge.getSourceId()).add(edge.getTargetId());
}
return adjacency;
}
}
拓扑排序与层级划分
@Component
public class TopologicalSorter {
public List<Set<String>> sortIntoLevels(List<NodeDefinition> nodes,
List<EdgeDefinition> edges) {
Map<String, Set<String>> inEdges = new HashMap<>();
Map<String, Set<String>> outEdges = new HashMap<>();
for (NodeDefinition node : nodes) {
inEdges.put(node.getNodeId(), new HashSet<>());
outEdges.put(node.getNodeId(), new HashSet<>());
}
for (EdgeDefinition edge : edges) {
outEdges.get(edge.getSourceId()).add(edge.getTargetId());
inEdges.get(edge.getTargetId()).add(edge.getSourceId());
}
List<Set<String>> levels = new ArrayList<>();
Set<String> processed = new HashSet<>();
while (processed.size() < nodes.size()) {
Set<String> currentLevel = new HashSet<>();
for (NodeDefinition node : nodes) {
String nodeId = node.getNodeId();
if (!processed.contains(nodeId) &&
inEdges.get(nodeId).stream().allMatch(processed::contains)) {
currentLevel.add(nodeId);
}
}
if (currentLevel.isEmpty()) {
throw new WorkflowException("DAG contains cycle, cannot sort");
}
levels.add(currentLevel);
processed.addAll(currentLevel);
}
return levels;
}
}
DAG调度器:并行执行无依赖节点
@Service
public class DAGScheduler {
private final TopologicalSorter sorter;
private final CycleDetector cycleDetector;
private final NodeExecutorFactory executorFactory;
private final CheckpointManager checkpointManager;
@Async("workflowExecutor")
public CompletableFuture<WorkflowResult> execute(
WorkflowDefinition workflow, WorkflowContext context) {
if (cycleDetector.hasCycle(workflow.getNodes(), workflow.getEdges())) {
throw new WorkflowException("Workflow contains cycle");
}
List<Set<String>> levels = sorter.sortIntoLevels(
workflow.getNodes(), workflow.getEdges());
for (int i = 0; i < levels.size(); i++) {
Set<String> levelNodes = levels.get(i);
log.info("Executing level {} with {} nodes: {}", i, levelNodes.size(), levelNodes);
List<CompletableFuture<NodeResult>> futures = levelNodes.stream()
.map(nodeId -> executeNode(workflow, nodeId, context))
.toList();
CompletableFuture<Void> levelFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
levelFuture.join();
for (CompletableFuture<NodeResult> future : futures) {
NodeResult result = future.join();
context.getNodeResults().put(result.getNodeId(), result);
}
checkpointManager.saveCheckpoint(context);
}
return CompletableFuture.completedFuture(
WorkflowResult.success(context));
}
private CompletableFuture<NodeResult> executeNode(
WorkflowDefinition workflow, String nodeId, WorkflowContext context) {
NodeDefinition nodeDef = workflow.getNodes().stream()
.filter(n -> n.getNodeId().equals(nodeId))
.findFirst()
.orElseThrow();
NodeExecutor executor = executorFactory.getExecutor(nodeDef.getType());
return executor.execute(nodeDef, context);
}
}
调度过程可视化:
Level 0: [意图识别] ← 无依赖,立即执行
Level 1: [查询订单] [查询知识库] ← 依赖Level 0,并行执行
Level 2: [结果聚合] ← 依赖Level 1全部完成
Level 3: [生成回复] ← 依赖Level 2
工具注册与调用框架
ToolRegistry:统一的工具注册中心
@Service
public class ToolRegistry {
private final Map<String, ToolDescriptor> tools = new ConcurrentHashMap<>();
private final Map<String, ToolExecutor> executors = new ConcurrentHashMap<>();
public void register(ToolDescriptor descriptor, ToolExecutor executor) {
tools.put(descriptor.getName(), descriptor);
executors.put(descriptor.getName(), executor);
log.info("Registered tool: {} with schema: {}",
descriptor.getName(), descriptor.getInputSchema());
}
public ToolExecutionResult execute(String toolName, Map<String, Object> input) {
ToolExecutor executor = executors.get(toolName);
if (executor == null) {
throw new ToolNotFoundException("Tool not found: " + toolName);
}
ToolDescriptor descriptor = tools.get(toolName);
validateInput(descriptor, input);
return executor.execute(input);
}
public List<ToolDescriptor> getAllTools() {
return new ArrayList<>(tools.values());
}
public List<Map<String, Object>> getOpenAIToolSchemas() {
return tools.values().stream()
.map(this::convertToOpenAISchema)
.toList();
}
private void validateInput(ToolDescriptor descriptor, Map<String, Object> input) {
JsonSchema schema = JsonSchemaFactory.getInstance()
.getSchema(descriptor.getInputSchema());
Set<ValidationMessage> errors = schema.validate(input);
if (!errors.isEmpty()) {
throw new ToolValidationException(
"Input validation failed: " + errors);
}
}
}
ToolExecutor接口与实现
public interface ToolExecutor {
ToolExecutionResult execute(Map<String, Object> input);
String getName();
String getDescription();
}
@Component
public class HttpToolExecutor implements ToolExecutor {
private final RestTemplate restTemplate;
@Override
public ToolExecutionResult execute(Map<String, Object> input) {
String url = (String) input.get("url");
String method = (String) input.getOrDefault("method", "GET");
Map<String, String> headers = (Map<String, String>) input.getOrDefault("headers", Map.of());
Object body = input.get("body");
HttpHeaders httpHeaders = new HttpHeaders();
headers.forEach(httpHeaders::set);
HttpEntity<Object> entity = new HttpEntity<>(body, httpHeaders);
ResponseEntity<String> response = restTemplate.exchange(
url, HttpMethod.valueOf(method), entity, String.class);
return ToolExecutionResult.builder()
.success(true)
.data(response.getBody())
.metadata(Map.of("statusCode", response.getStatusCode().value()))
.build();
}
@Override
public String getName() { return "http_request"; }
@Override
public String getDescription() { return "Execute HTTP request to external API"; }
}
@Component
public class DatabaseToolExecutor implements ToolExecutor {
private final JdbcTemplate jdbcTemplate;
@Override
public ToolExecutionResult execute(Map<String, Object> input) {
String sql = (String) input.get("sql");
List<Object> params = (List<Object>) input.getOrDefault("params", List.of());
if (!isSafeQuery(sql)) {
throw new SecurityException("Unsafe SQL query blocked: " + sql);
}
List<Map<String, Object>> results = jdbcTemplate.queryForList(sql, params.toArray());
return ToolExecutionResult.builder()
.success(true)
.data(results)
.build();
}
private boolean isSafeQuery(String sql) {
String upper = sql.toUpperCase().trim();
return upper.startsWith("SELECT") && !upper.contains("DROP")
&& !upper.contains("DELETE") && !upper.contains("UPDATE");
}
@Override
public String getName() { return "database_query"; }
@Override
public String getDescription() { return "Execute read-only SQL query"; }
}
@Component
public class FileToolExecutor implements ToolExecutor {
@Value("${tool.file.base-dir}")
private String baseDir;
@Override
public ToolExecutionResult execute(Map<String, Object> input) {
String operation = (String) input.get("operation");
String path = (String) input.get("path");
Path resolvedPath = Paths.get(baseDir, path).normalize();
if (!resolvedPath.startsWith(Paths.get(baseDir))) {
throw new SecurityException("Path traversal detected");
}
return switch (operation) {
case "read" -> readFile(resolvedPath);
case "list" -> listDirectory(resolvedPath);
case "write" -> writeFile(resolvedPath, (String) input.get("content"));
default -> throw new IllegalArgumentException("Unknown operation: " + operation);
};
}
@Override
public String getName() { return "file_operation"; }
@Override
public String getDescription() { return "Read, list, or write files within allowed directory"; }
}
工具注册配置
@Configuration
public class ToolRegistryConfig {
@Bean
public ToolRegistry toolRegistry(
HttpToolExecutor httpTool,
DatabaseToolExecutor dbTool,
FileToolExecutor fileTool) {
ToolRegistry registry = new ToolRegistry();
registry.register(ToolDescriptor.builder()
.name("http_request")
.description("Execute HTTP request to external API")
.inputSchema("""
{
"type": "object",
"properties": {
"url": {"type": "string", "format": "uri"},
"method": {"type": "string", "enum": ["GET","POST","PUT","DELETE"]},
"headers": {"type": "object"},
"body": {"type": "object"}
},
"required": ["url"]
}
""")
.build(), httpTool);
registry.register(ToolDescriptor.builder()
.name("database_query")
.description("Execute read-only SQL query")
.inputSchema("""
{
"type": "object",
"properties": {
"sql": {"type": "string"},
"params": {"type": "array"}
},
"required": ["sql"]
}
""")
.build(), dbTool);
registry.register(ToolDescriptor.builder()
.name("file_operation")
.description("File operations within allowed directory")
.inputSchema("""
{
"type": "object",
"properties": {
"operation": {"type": "string", "enum": ["read","list","write"]},
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["operation", "path"]
}
""")
.build(), fileTool);
return registry;
}
}
状态机:Agent执行状态管理
状态定义与转换
┌──────────┐
创建 │ CREATED │
────────► └──────────┘
│ start
▼
┌──────────┐
│ RUNNING │◄────────┐
└──────────┘ │
│ │ resume │
pause │ │ complete │
▼ ▼ │
┌──────────┐ ┌──────────┐ │
│ PAUSED │ │COMPLETED │ │
└──────────┘ └──────────┘ │
│ │
│ resume │
└──────────────────────┘
│ error
▼
┌──────────┐
│ FAILED │
└──────────┘
Spring Statemachine配置
@Configuration
@EnableStateMachineFactory
public class WorkflowStateMachineConfig
extends EnumStateMachineConfigurerAdapter<WorkflowState, WorkflowEvent> {
@Override
public void configure(StateMachineStateConfigurer<WorkflowState, WorkflowEvent> states)
throws Exception {
states
.withStates()
.initial(WorkflowState.CREATED)
.states(EnumSet.allOf(WorkflowState.class))
.end(WorkflowState.COMPLETED)
.end(WorkflowState.FAILED);
}
@Override
public void configure(StateMachineTransitionConfigurer<WorkflowState, WorkflowEvent> transitions)
throws Exception {
transitions
.withExternal()
.source(WorkflowState.CREATED).target(WorkflowState.RUNNING)
.event(WorkflowEvent.START)
.and()
.withExternal()
.source(WorkflowState.RUNNING).target(WorkflowState.PAUSED)
.event(WorkflowEvent.PAUSE)
.and()
.withExternal()
.source(WorkflowState.PAUSED).target(WorkflowState.RUNNING)
.event(WorkflowEvent.RESUME)
.and()
.withExternal()
.source(WorkflowState.RUNNING).target(WorkflowState.COMPLETED)
.event(WorkflowEvent.COMPLETE)
.and()
.withExternal()
.source(WorkflowState.RUNNING).target(WorkflowState.FAILED)
.event(WorkflowEvent.ERROR)
.and()
.withExternal()
.source(WorkflowState.PAUSED).target(WorkflowState.FAILED)
.event(WorkflowEvent.ERROR);
}
}
public enum WorkflowState {
CREATED, RUNNING, PAUSED, COMPLETED, FAILED
}
public enum WorkflowEvent {
START, PAUSE, RESUME, COMPLETE, ERROR
}
Redis检查点持久化
@Component
public class RedisCheckpointManager implements CheckpointManager {
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
private static final String KEY_PREFIX = "workflow:checkpoint:";
@Override
public void saveCheckpoint(WorkflowContext context) {
String key = KEY_PREFIX + context.getExecutionId();
try {
String serialized = objectMapper.writeValueAsString(context);
redisTemplate.opsForValue().set(key, serialized, Duration.ofHours(24));
log.info("Checkpoint saved for execution: {}", context.getExecutionId());
} catch (JsonProcessingException e) {
throw new WorkflowException("Failed to save checkpoint", e);
}
}
@Override
public Optional<WorkflowContext> loadCheckpoint(String executionId) {
String key = KEY_PREFIX + executionId;
String serialized = redisTemplate.opsForValue().get(key);
if (serialized == null) {
return Optional.empty();
}
try {
return Optional.of(objectMapper.readValue(serialized, WorkflowContext.class));
} catch (JsonProcessingException e) {
throw new WorkflowException("Failed to load checkpoint", e);
}
}
@Override
public void deleteCheckpoint(String executionId) {
redisTemplate.delete(KEY_PREFIX + executionId);
}
}
从检查点恢复执行
@Service
public class WorkflowRecoveryService {
private final RedisCheckpointManager checkpointManager;
private final DAGScheduler dagScheduler;
private final WorkflowDefinitionRegistry definitionRegistry;
public WorkflowResult recoverFromCheckpoint(String executionId) {
WorkflowContext context = checkpointManager.loadCheckpoint(executionId)
.orElseThrow(() -> new WorkflowException(
"No checkpoint found for: " + executionId));
WorkflowDefinition workflow = definitionRegistry.get(context.getWorkflowId());
log.info("Recovering workflow {} from checkpoint, current state: {}",
context.getWorkflowId(), context.getCurrentState());
Set<String> completedNodes = context.getNodeResults().keySet();
WorkflowDefinition remainingWorkflow = filterRemainingNodes(workflow, completedNodes);
return dagScheduler.execute(remainingWorkflow, context).join();
}
private WorkflowDefinition filterRemainingNodes(
WorkflowDefinition workflow, Set<String> completedNodes) {
List<NodeDefinition> remainingNodes = workflow.getNodes().stream()
.filter(n -> !completedNodes.contains(n.getNodeId()))
.toList();
List<EdgeDefinition> remainingEdges = workflow.getEdges().stream()
.filter(e -> !completedNodes.contains(e.getSourceId()))
.filter(e -> !completedNodes.contains(e.getTargetId()))
.toList();
return WorkflowDefinition.builder()
.workflowId(workflow.getWorkflowId())
.nodes(remainingNodes)
.edges(remainingEdges)
.build();
}
}
Multi-Agent协作模式
四种协作模式对比
| 模式 | 适用场景 | 并发度 | 延迟 | 一致性 | 典型用例 |
|---|---|---|---|---|---|
| Sequential | 有严格依赖关系 | 1 | 高 | 强 | 审批流、流水线 |
| Parallel | 无依赖可并行 | N | 低 | 最终一致 | 多源数据聚合 |
| Router | 条件分支路由 | 1 | 低 | 强 | 意图分发、领域路由 |
| Debate | 需要多视角验证 | N | 高 | 投票/加权 | 代码审查、决策论证 |
协作模式接口
public interface AgentCollaborationMode {
WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context);
String getModeName();
}
Sequential模式
@Component
public class SequentialMode implements AgentCollaborationMode {
private final AgentExecutor agentExecutor;
@Override
public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
Object currentOutput = context.getVariable("input", Object.class);
for (AgentDefinition agent : agents) {
log.info("Sequential execution: agent={}, input type={}",
agent.getName(), currentOutput.getClass().getSimpleName());
AgentResult result = agentExecutor.execute(agent, currentOutput, context);
currentOutput = result.getOutput();
context.setVariable("agent_" + agent.getName() + "_output", currentOutput);
}
context.setVariable("output", currentOutput);
return WorkflowResult.success(context);
}
@Override
public String getModeName() { return "sequential"; }
}
Parallel模式
@Component
public class ParallelMode implements AgentCollaborationMode {
private final AgentExecutor agentExecutor;
@Override
public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
Object input = context.getVariable("input", Object.class);
List<CompletableFuture<AgentResult>> futures = agents.stream()
.map(agent -> CompletableFuture.supplyAsync(
() -> agentExecutor.execute(agent, input, context)))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
Map<String, Object> mergedOutput = new LinkedHashMap<>();
for (int i = 0; i < agents.size(); i++) {
AgentResult result = futures.get(i).join();
mergedOutput.put(agents.get(i).getName(), result.getOutput());
}
context.setVariable("output", mergedOutput);
return WorkflowResult.success(context);
}
@Override
public String getModeName() { return "parallel"; }
}
Router模式
@Component
public class RouterMode implements AgentCollaborationMode {
private final AgentExecutor agentExecutor;
private final ChatClient chatClient;
@Override
public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
Object input = context.getVariable("input", Object.class);
String agentNames = agents.stream()
.map(AgentDefinition::getName)
.collect(Collectors.joining(", "));
String routingPrompt = """
Based on the input, select the most appropriate agent from: %s
Input: %s
Return ONLY the agent name, nothing else.
""".formatted(agentNames, input);
String selectedAgent = chatClient.prompt()
.user(routingPrompt)
.call()
.content();
AgentDefinition agent = agents.stream()
.filter(a -> a.getName().equals(selectedAgent.trim()))
.findFirst()
.orElse(agents.get(0));
log.info("Router selected agent: {}", agent.getName());
AgentResult result = agentExecutor.execute(agent, input, context);
context.setVariable("output", result.getOutput());
return WorkflowResult.success(context);
}
@Override
public String getModeName() { return "router"; }
}
Debate模式
@Component
public class DebateMode implements AgentCollaborationMode {
private final AgentExecutor agentExecutor;
@Value("${workflow.debate.rounds:3}")
private int debateRounds;
@Override
public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
Object topic = context.getVariable("input", Object.class);
List<DebateRound> rounds = new ArrayList<>();
for (int round = 0; round < debateRounds; round++) {
log.info("Debate round {}/{}", round + 1, debateRounds);
List<AgentResult> roundResults = agents.stream()
.map(agent -> agentExecutor.execute(agent, topic, context))
.toList();
DebateRound debateRound = new DebateRound(round, roundResults);
rounds.add(debateRound);
topic = buildDebateContext(rounds);
}
Object finalDecision = voteOnResults(rounds);
context.setVariable("debate_rounds", rounds);
context.setVariable("output", finalDecision);
return WorkflowResult.success(context);
}
private Object voteOnResults(List<DebateRound> rounds) {
Map<String, Integer> voteCount = new HashMap<>();
for (DebateRound round : rounds) {
for (AgentResult result : round.results()) {
String conclusion = result.getOutput().toString();
voteCount.merge(conclusion, 1, Integer::sum);
}
}
return voteCount.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse("No consensus reached");
}
@Override
public String getModeName() { return "debate"; }
}
与Spring AI深度集成
工作流DSL定义
@Component
public class CustomerServiceWorkflow {
private final WorkflowEngine workflowEngine;
private final ToolRegistry toolRegistry;
public WorkflowDefinition build() {
return workflowEngine.workflow("customer-service", "1.0")
.node("classify_intent")
.type("llm")
.property("prompt", "判断用户意图:查询订单/退款/投诉/转人工")
.property("model", "gpt-4o")
.node("query_order")
.type("tool")
.property("toolName", "database_query")
.property("sql", "SELECT * FROM orders WHERE order_id = :orderId")
.node("query_knowledge")
.type("tool")
.property("toolName", "http_request")
.property("url", "http://knowledge-service/api/search")
.node("aggregate")
.type("transform")
.property("expression", "merge(orderData, knowledgeData)")
.node("generate_response")
.type("llm")
.property("prompt", "基于以下信息生成客服回复:{{aggregatedData}}")
.edge("classify_intent", "query_order")
.condition("intent == 'query_order'")
.edge("classify_intent", "query_knowledge")
.condition("intent == 'knowledge_query'")
.edge("query_order", "aggregate")
.edge("query_knowledge", "aggregate")
.edge("aggregate", "generate_response")
.build();
}
}
Spring AI ChatClient集成节点
@Component
public class LLMNodeExecutor implements NodeExecutor {
private final ChatClient chatClient;
private final ToolRegistry toolRegistry;
@Override
public CompletableFuture<NodeResult> execute(
NodeDefinition node, WorkflowContext context) {
String promptTemplate = (String) node.getProperties().get("prompt");
String model = (String) node.getProperties().getOrDefault("model", "gpt-4o");
boolean useTools = (boolean) node.getProperties().getOrDefault("useTools", false);
String resolvedPrompt = resolveVariables(promptTemplate, context);
ChatClient.ChatClientRequestSpec request = chatClient.prompt()
.user(resolvedPrompt)
.options(ChatOptionsBuilder.builder()
.withModel(model)
.withTemperature(0.1)
.build());
if (useTools) {
List<Map<String, Object>> toolSchemas = toolRegistry.getOpenAIToolSchemas();
request.tools(toolSchemas);
}
String response = request.call().content();
NodeResult result = NodeResult.builder()
.nodeId(node.getNodeId())
.success(true)
.output(response)
.timestamp(Instant.now())
.build();
return CompletableFuture.completedFuture(result);
}
private String resolveVariables(String template, WorkflowContext context) {
Pattern pattern = Pattern.compile("\\{\\{(\\w+)}}");
Matcher matcher = pattern.matcher(template);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
Object value = context.getVariable(matcher.group(1), Object.class);
matcher.appendReplacement(sb, value != null ? value.toString() : "");
}
matcher.appendTail(sb);
return sb.toString();
}
@Override
public String getSupportedType() { return "llm"; }
}
REST Controller暴露工作流API
@RestController
@RequestMapping("/api/v1/workflows")
public class WorkflowController {
private final WorkflowEngine workflowEngine;
private final WorkflowRecoveryService recoveryService;
@PostMapping("/{workflowId}/execute")
public ResponseEntity<WorkflowExecutionResponse> executeWorkflow(
@PathVariable String workflowId,
@RequestBody WorkflowExecutionRequest request) {
WorkflowContext context = WorkflowContext.builder()
.workflowId(workflowId)
.executionId(UUID.randomUUID().toString())
.variables(request.getInputs())
.build();
CompletableFuture<WorkflowResult> future = workflowEngine.execute(workflowId, context);
return ResponseEntity.accepted()
.body(WorkflowExecutionResponse.builder()
.executionId(context.getExecutionId())
.status(WorkflowState.RUNNING)
.build());
}
@GetMapping("/{workflowId}/executions/{executionId}")
public ResponseEntity<WorkflowExecutionResponse> getExecutionStatus(
@PathVariable String workflowId,
@PathVariable String executionId) {
Optional<WorkflowContext> checkpoint = recoveryService.getCheckpoint(executionId);
return checkpoint.map(ctx -> ResponseEntity.ok(
WorkflowExecutionResponse.builder()
.executionId(executionId)
.status(ctx.getCurrentState())
.nodeResults(ctx.getNodeResults())
.build()))
.orElse(ResponseEntity.notFound().build());
}
@PostMapping("/{workflowId}/executions/{executionId}/resume")
public ResponseEntity<WorkflowExecutionResponse> resumeWorkflow(
@PathVariable String workflowId,
@PathVariable String executionId) {
WorkflowResult result = recoveryService.recoverFromCheckpoint(executionId);
return ResponseEntity.ok(WorkflowExecutionResponse.builder()
.executionId(executionId)
.status(WorkflowState.COMPLETED)
.build());
}
}
工作流可视化与调试
执行追踪模型
public class ExecutionTrace {
private String executionId;
private List<NodeTrace> nodeTraces;
private Instant startTime;
private Instant endTime;
private long totalDurationMs;
private WorkflowState finalState;
}
public class NodeTrace {
private String nodeId;
private String nodeName;
private Instant startTime;
private Instant endTime;
private long durationMs;
private NodeStatus status;
private String errorMessage;
private Map<String, Object> inputSnapshot;
private Map<String, Object> outputSnapshot;
private List<ToolCallTrace> toolCalls;
}
public class ToolCallTrace {
private String toolName;
private Instant callTime;
private long durationMs;
private Map<String, Object> arguments;
private Object result;
}
审计日志记录
@Aspect
@Component
public class WorkflowAuditAspect {
private final AuditLogRepository auditLogRepository;
@Around("execution(* com.toolsku.workflow..*Executor.execute(..))")
public Object auditNodeExecution(ProceedingJoinPoint joinPoint) throws Throwable {
NodeDefinition node = (NodeDefinition) joinPoint.getArgs()[0];
WorkflowContext context = (WorkflowContext) joinPoint.getArgs()[1];
AuditLog auditLog = AuditLog.builder()
.executionId(context.getExecutionId())
.nodeId(node.getNodeId())
.nodeType(node.getType())
.startTime(Instant.now())
.build();
try {
Object result = joinPoint.proceed();
auditLog.setStatus("SUCCESS");
auditLog.setEndTime(Instant.now());
return result;
} catch (Exception e) {
auditLog.setStatus("FAILED");
auditLog.setErrorMessage(e.getMessage());
auditLog.setEndTime(Instant.now());
throw e;
} finally {
auditLogRepository.save(auditLog);
}
}
}
可视化数据生成
@Service
public class WorkflowVisualizationService {
public WorkflowGraphData generateGraphData(WorkflowDefinition workflow, ExecutionTrace trace) {
List<GraphNode> nodes = workflow.getNodes().stream()
.map(nodeDef -> {
NodeTrace nodeTrace = trace != null ?
trace.getNodeTraces().stream()
.filter(t -> t.getNodeId().equals(nodeDef.getNodeId()))
.findFirst().orElse(null) : null;
return GraphNode.builder()
.id(nodeDef.getNodeId())
.label(nodeDef.getNodeId())
.type(nodeDef.getType())
.status(nodeTrace != null ? nodeTrace.getStatus() : NodeStatus.PENDING)
.durationMs(nodeTrace != null ? nodeTrace.getDurationMs() : 0)
.build();
})
.toList();
List<GraphEdge> edges = workflow.getEdges().stream()
.map(edgeDef -> GraphEdge.builder()
.source(edgeDef.getSourceId())
.target(edgeDef.getTargetId())
.condition(edgeDef.getCondition())
.type(edgeDef.getType())
.build())
.toList();
return WorkflowGraphData.builder()
.nodes(nodes)
.edges(edges)
.build();
}
}
容错与重试机制
RetryPolicy定义
public class RetryPolicy {
private int maxAttempts = 3;
private Duration initialDelay = Duration.ofMillis(500);
private double multiplier = 2.0;
private Duration maxDelay = Duration.ofSeconds(30);
private Set<Class<? extends Throwable>> retryableExceptions = Set.of(
ToolExecutionException.class,
ModelTimeoutException.class,
RateLimitExceededException.class
);
private boolean exponentialBackoff = true;
}
RetryableNodeExecutor
@Component
public class RetryableNodeExecutor implements NodeExecutor {
private final Map<String, NodeExecutor> executorMap;
private final MetricsCollector metricsCollector;
@Override
public CompletableFuture<NodeResult> execute(
NodeDefinition node, WorkflowContext context) {
NodeExecutor delegate = executorMap.get(node.getType());
RetryPolicy policy = node.getRetryPolicy() != null ?
node.getRetryPolicy() : RetryPolicy.defaultPolicy();
return CompletableFuture.supplyAsync(() -> {
int attempt = 0;
Throwable lastException = null;
while (attempt < policy.getMaxAttempts()) {
attempt++;
try {
NodeResult result = delegate.execute(node, context).join();
metricsCollector.recordRetrySuccess(node.getNodeId(), attempt);
return result;
} catch (Throwable e) {
lastException = e;
if (!isRetryable(e, policy)) {
throw new WorkflowExecutionException(
"Non-retryable exception", e);
}
if (attempt < policy.getMaxAttempts()) {
Duration delay = calculateDelay(attempt, policy);
log.warn("Node {} failed (attempt {}/{}), retrying in {}ms: {}",
node.getNodeId(), attempt, policy.getMaxAttempts(),
delay.toMillis(), e.getMessage());
sleep(delay);
}
}
}
metricsCollector.recordRetryExhausted(node.getNodeId(), attempt);
throw new WorkflowExecutionException(
"Node " + node.getNodeId() + " failed after " + attempt + " attempts",
lastException);
});
}
private Duration calculateDelay(int attempt, RetryPolicy policy) {
if (!policy.isExponentialBackoff()) {
return policy.getInitialDelay();
}
long delayMs = (long) (policy.getInitialDelay().toMillis()
* Math.pow(policy.getMultiplier(), attempt - 1));
delayMs = Math.min(delayMs, policy.getMaxDelay().toMillis());
return Duration.ofMillis(delayMs);
}
private boolean isRetryable(Throwable e, RetryPolicy policy) {
return policy.getRetryableExceptions().stream()
.anyMatch(exType -> exType.isInstance(e));
}
@Override
public String getSupportedType() { return "retryable_wrapper"; }
}
CircuitBreaker集成
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(80)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slowCallDurationThreshold(Duration.ofSeconds(5))
.permittedNumberOfCallsInHalfOpenState(3)
.slidingWindowSize(10)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.build();
return CircuitBreakerRegistry.of(config);
}
@Bean
public CircuitBreaker llmCircuitBreaker(CircuitBreakerRegistry registry) {
return registry.circuitBreaker("llm-calls");
}
}
生产级部署与性能优化
监控指标清单
| 指标类别 | 指标名 | 类型 | 说明 |
|---|---|---|---|
| 执行 | workflow_execution_total | Counter | 工作流执行总数 |
| 执行 | workflow_execution_duration | Histogram | 执行耗时分布 |
| 执行 | workflow_execution_active | Gauge | 当前活跃执行数 |
| 节点 | node_execution_total | Counter | 节点执行总数 |
| 节点 | node_execution_duration | Histogram | 节点耗时分布 |
| 节点 | node_execution_errors | Counter | 节点错误数 |
| 工具 | tool_call_total | Counter | 工具调用总数 |
| 工具 | tool_call_duration | Histogram | 工具调用耗时 |
| LLM | llm_token_input | Counter | 输入Token消耗 |
| LLM | llm_token_output | Counter | 输出Token消耗 |
| LLM | llm_request_latency | Histogram | LLM请求延迟 |
| 重试 | retry_attempts_total | Counter | 重试总次数 |
| 重试 | retry_exhausted_total | Counter | 重试耗尽次数 |
Micrometer指标采集
@Component
public class WorkflowMetricsCollector {
private final MeterRegistry meterRegistry;
public void recordWorkflowExecution(String workflowId, Duration duration, boolean success) {
Counter.builder("workflow.execution.total")
.tag("workflow", workflowId)
.tag("success", String.valueOf(success))
.register(meterRegistry).increment();
Timer.builder("workflow.execution.duration")
.tag("workflow", workflowId)
.register(meterRegistry)
.record(duration);
}
public void recordNodeExecution(String nodeId, String nodeType,
Duration duration, boolean success) {
Counter.builder("node.execution.total")
.tag("node", nodeId)
.tag("type", nodeType)
.tag("success", String.valueOf(success))
.register(meterRegistry).increment();
Timer.builder("node.execution.duration")
.tag("node", nodeId)
.tag("type", nodeType)
.register(meterRegistry)
.record(duration);
}
public void recordTokenUsage(String model, int inputTokens, int outputTokens) {
Counter.builder("llm.token.input")
.tag("model", model)
.register(meterRegistry).increment(inputTokens);
Counter.builder("llm.token.output")
.tag("model", model)
.register(meterRegistry).increment(outputTokens);
}
}
性能优化清单
| 优化项 | 优化前 | 优化后 | 方法 |
|---|---|---|---|
| LLM调用延迟 | 2.5s/次 | 0.8s/次 | 语义缓存(相同prompt命中缓存) |
| 工作流启动 | 500ms | 50ms | 预编译DAG + 对象池 |
| 检查点保存 | 200ms | 30ms | 异步批量写入Redis Pipeline |
| 节点间数据传递 | 序列化开销 | 零拷贝 | 引用传递 + Copy-on-Write |
| 并行节点调度 | 顺序提交 | 真并行 | ForkJoinPool + WorkStealing |
| 工具Schema生成 | 每次反射 | 1次 | 启动时预生成 + 缓存 |
语义缓存实现
@Component
public class SemanticCache {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
@Value("${workflow.cache.similarity-threshold:0.95}")
private double similarityThreshold;
public Optional<String> get(String prompt) {
float[] embedding = embeddingModel.embed(prompt);
List<Document> results = vectorStore.similaritySearch(
SearchRequest.builder()
.query(prompt)
.topK(1)
.similarityThreshold(similarityThreshold)
.build());
if (!results.isEmpty()) {
Document doc = results.get(0);
log.info("Semantic cache hit for prompt (similarity: {})",
doc.getMetadata().get("distance"));
return Optional.of(doc.getContent());
}
return Optional.empty();
}
public void put(String prompt, String response) {
float[] embedding = embeddingModel.embed(prompt);
Document doc = new Document(prompt, response,
Map.of("embedding", embedding, "timestamp", Instant.now().toString()));
vectorStore.add(List.of(doc));
}
}
Docker Compose部署配置
version: '3.8'
services:
workflow-engine:
image: toolsku/workflow-engine:1.0.0
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=production
- SPRING_AI_OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_HOST=redis
- POSTGRES_HOST=postgres
depends_on:
- redis
- postgres
deploy:
resources:
limits:
memory: 2G
cpus: '4'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
command: redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: workflow
POSTGRES_USER: workflow
POSTGRES_PASSWORD: ${DB_PASSWORD}
ports:
- "5432:5432"
prometheus:
image: prom/prometheus
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana
ports:
- "3000:3000"
总结:自建 vs 使用框架的决策矩阵
| 维度 | 自建引擎 | LangGraph | Spring AI | Dify |
|---|---|---|---|---|
| DAG工作流 | ✅ 完全控制 | ✅ | ❌ | ✅ |
| 状态持久化 | ✅ Redis | ⚠️ 需自建 | ❌ | ✅ |
| Multi-Agent | ✅ 4种模式 | ⚠️ 基础 | ❌ | ⚠️ |
| Java生态 | ✅ 原生 | ❌ Python | ✅ | ❌ |
| 可观测性 | ✅ Micrometer | ✅ LangSmith | ⚠️ | ✅ |
| 学习曲线 | 高 | 中 | 低 | 低 |
| 维护成本 | 高 | 中 | 低 | 低 |
| 定制灵活性 | ✅ 最高 | 中 | 中 | 低 |
建议:
- 中小企业/快速验证 → 用Dify或Spring AI,别重复造轮子
- Java技术栈/需要深度定制 → 自建引擎,本文的架构可直接落地
- Python团队/已有LangChain经验 → LangGraph,生态最成熟
自建引擎的核心价值不在于"比别人强",而在于"完全可控"——当你的Agent需要与内部微服务、遗留系统、定制中间件深度集成时,自建是唯一的选择。
本站提供浏览器本地工具,免注册即可试用 →
#AI Agent#工作流引擎#DAG调度#状态机#Function Calling#Multi-Agent#Java