AI Agent工作流引擎从零构建:用Java打造生产级智能体编排系统

技术架构

为什么需要自建AI Agent工作流引擎?

2026年,AI Agent已经从"玩具"走向"生产",但现有框架的局限性越来越明显:

框架 语言 工作流模式 DAG支持 状态持久化 企业级就绪
LangChain/LangGraph Python 状态机 ❌(需自建) ⚠️
CrewAI Python 顺序/层级
AutoGen Python 对话式
Spring AI Java 线性链 ⚠️
Dify Python/Go 可视化DAG

核心痛点:

  1. LangChain生态锁定Python — Java企业项目无法直接复用,跨语言调用带来序列化开销和调试地狱
  2. Spring AI仅支持线性链ChatClient → Tool → Advisor 的链式调用无法表达复杂分支和并行逻辑
  3. 状态管理缺失 — Agent执行到一半崩溃,无法从断点恢复
  4. Multi-Agent协作原始 — 没有内置的Sequential/Parallel/Router/Debate等协作模式

真实案例:某金融公司用LangGraph做风控Agent,因Python GIL限制导致并发瓶颈,单机QPS仅120;迁移到Java自建引擎后QPS提升至1800。


Agent工作流引擎架构设计

五层架构总览

┌─────────────────────────────────────────────────────────────────┐
│                        API 层(Layer 1)                        │
│   REST Controller │ GraphQL │ WebSocket(SSE流式响应)            │
├─────────────────────────────────────────────────────────────────┤
│                      编排层(Layer 2)                           │
│   WorkflowEngine │ DAGScheduler │ ConditionRouter │ LoopHandler  │
├─────────────────────────────────────────────────────────────────┤
│                      节点层(Layer 3)                           │
│   LLMNode │ ToolNode │ TransformNode │ SubWorkflowNode          │
│   HumanApprovalNode │ ParallelGroupNode │ RouterNode            │
├─────────────────────────────────────────────────────────────────┤
│                    状态管理层(Layer 4)                          │
│   StateMachine(Spring Statemachine)│ CheckpointManager         │
│   Redis持久化 │ 执行上下文(WorkflowContext)                      │
├─────────────────────────────────────────────────────────────────┤
│                    基础设施层(Layer 5)                          │
│   ToolRegistry │ ModelRegistry │ EventBus │ AuditLogger          │
│   MetricsCollector │ RetryPolicy │ CircuitBreaker               │
└─────────────────────────────────────────────────────────────────┘

核心领域模型

public class WorkflowDefinition {
    private String workflowId;
    private String name;
    private String version;
    private List<NodeDefinition> nodes;
    private List<EdgeDefinition> edges;
    private Map<String, Object> metadata;
    private WorkflowConfig config;
}

public class NodeDefinition {
    private String nodeId;
    private String type;
    private Map<String, Object> properties;
    private RetryPolicy retryPolicy;
    private TimeoutConfig timeout;
}

public class EdgeDefinition {
    private String sourceId;
    private String targetId;
    private String condition;
    private EdgeType type;
}

public enum EdgeType {
    NORMAL, CONDITIONAL, PARALLEL, LOOP
}

工作流执行上下文

public class WorkflowContext {
    private String executionId;
    private String workflowId;
    private Map<String, Object> variables;
    private Map<String, NodeResult> nodeResults;
    private WorkflowState currentState;
    private Instant startTime;
    private String checkpointKey;

    public <T> T getVariable(String key, Class<T> type) {
        Object value = variables.get(key);
        return type.cast(value);
    }

    public void setVariable(String key, Object value) {
        variables.put(key, value);
    }

    public NodeResult getNodeResult(String nodeId) {
        return nodeResults.get(nodeId);
    }
}

DAG任务调度器核心实现

DAG(有向无环图)是工作流引擎的调度核心。我们需要解决三个关键问题:环检测拓扑排序并行执行无依赖节点

环检测算法

@Component
public class CycleDetector {

    public boolean hasCycle(List<NodeDefinition> nodes, List<EdgeDefinition> edges) {
        Map<String, Set<String>> adjacency = buildAdjacencyMap(nodes, edges);
        Set<String> visited = new HashSet<>();
        Set<String> recursionStack = new HashSet<>();

        for (NodeDefinition node : nodes) {
            if (dfsCycleCheck(node.getNodeId(), adjacency, visited, recursionStack)) {
                return true;
            }
        }
        return false;
    }

    private boolean dfsCycleCheck(String nodeId,
                                   Map<String, Set<String>> adjacency,
                                   Set<String> visited,
                                   Set<String> recursionStack) {
        visited.add(nodeId);
        recursionStack.add(nodeId);

        for (String neighbor : adjacency.getOrDefault(nodeId, Set.of())) {
            if (!visited.contains(neighbor)) {
                if (dfsCycleCheck(neighbor, adjacency, visited, recursionStack)) {
                    return true;
                }
            } else if (recursionStack.contains(neighbor)) {
                return true;
            }
        }

        recursionStack.remove(nodeId);
        return false;
    }

    private Map<String, Set<String>> buildAdjacencyMap(
            List<NodeDefinition> nodes, List<EdgeDefinition> edges) {
        Map<String, Set<String>> adjacency = new HashMap<>();
        for (NodeDefinition node : nodes) {
            adjacency.put(node.getNodeId(), new HashSet<>());
        }
        for (EdgeDefinition edge : edges) {
            adjacency.get(edge.getSourceId()).add(edge.getTargetId());
        }
        return adjacency;
    }
}

拓扑排序与层级划分

@Component
public class TopologicalSorter {

    public List<Set<String>> sortIntoLevels(List<NodeDefinition> nodes,
                                             List<EdgeDefinition> edges) {
        Map<String, Set<String>> inEdges = new HashMap<>();
        Map<String, Set<String>> outEdges = new HashMap<>();

        for (NodeDefinition node : nodes) {
            inEdges.put(node.getNodeId(), new HashSet<>());
            outEdges.put(node.getNodeId(), new HashSet<>());
        }

        for (EdgeDefinition edge : edges) {
            outEdges.get(edge.getSourceId()).add(edge.getTargetId());
            inEdges.get(edge.getTargetId()).add(edge.getSourceId());
        }

        List<Set<String>> levels = new ArrayList<>();
        Set<String> processed = new HashSet<>();

        while (processed.size() < nodes.size()) {
            Set<String> currentLevel = new HashSet<>();
            for (NodeDefinition node : nodes) {
                String nodeId = node.getNodeId();
                if (!processed.contains(nodeId) &&
                    inEdges.get(nodeId).stream().allMatch(processed::contains)) {
                    currentLevel.add(nodeId);
                }
            }
            if (currentLevel.isEmpty()) {
                throw new WorkflowException("DAG contains cycle, cannot sort");
            }
            levels.add(currentLevel);
            processed.addAll(currentLevel);
        }

        return levels;
    }
}

DAG调度器:并行执行无依赖节点

@Service
public class DAGScheduler {

    private final TopologicalSorter sorter;
    private final CycleDetector cycleDetector;
    private final NodeExecutorFactory executorFactory;
    private final CheckpointManager checkpointManager;

    @Async("workflowExecutor")
    public CompletableFuture<WorkflowResult> execute(
            WorkflowDefinition workflow, WorkflowContext context) {

        if (cycleDetector.hasCycle(workflow.getNodes(), workflow.getEdges())) {
            throw new WorkflowException("Workflow contains cycle");
        }

        List<Set<String>> levels = sorter.sortIntoLevels(
            workflow.getNodes(), workflow.getEdges());

        for (int i = 0; i < levels.size(); i++) {
            Set<String> levelNodes = levels.get(i);
            log.info("Executing level {} with {} nodes: {}", i, levelNodes.size(), levelNodes);

            List<CompletableFuture<NodeResult>> futures = levelNodes.stream()
                .map(nodeId -> executeNode(workflow, nodeId, context))
                .toList();

            CompletableFuture<Void> levelFuture = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]));

            levelFuture.join();

            for (CompletableFuture<NodeResult> future : futures) {
                NodeResult result = future.join();
                context.getNodeResults().put(result.getNodeId(), result);
            }

            checkpointManager.saveCheckpoint(context);
        }

        return CompletableFuture.completedFuture(
            WorkflowResult.success(context));
    }

    private CompletableFuture<NodeResult> executeNode(
            WorkflowDefinition workflow, String nodeId, WorkflowContext context) {
        NodeDefinition nodeDef = workflow.getNodes().stream()
            .filter(n -> n.getNodeId().equals(nodeId))
            .findFirst()
            .orElseThrow();

        NodeExecutor executor = executorFactory.getExecutor(nodeDef.getType());
        return executor.execute(nodeDef, context);
    }
}

调度过程可视化:

Level 0:  [意图识别]          ← 无依赖,立即执行
Level 1:  [查询订单] [查询知识库]  ← 依赖Level 0,并行执行
Level 2:  [结果聚合]          ← 依赖Level 1全部完成
Level 3:  [生成回复]          ← 依赖Level 2

工具注册与调用框架

ToolRegistry:统一的工具注册中心

@Service
public class ToolRegistry {

    private final Map<String, ToolDescriptor> tools = new ConcurrentHashMap<>();
    private final Map<String, ToolExecutor> executors = new ConcurrentHashMap<>();

    public void register(ToolDescriptor descriptor, ToolExecutor executor) {
        tools.put(descriptor.getName(), descriptor);
        executors.put(descriptor.getName(), executor);
        log.info("Registered tool: {} with schema: {}",
            descriptor.getName(), descriptor.getInputSchema());
    }

    public ToolExecutionResult execute(String toolName, Map<String, Object> input) {
        ToolExecutor executor = executors.get(toolName);
        if (executor == null) {
            throw new ToolNotFoundException("Tool not found: " + toolName);
        }
        ToolDescriptor descriptor = tools.get(toolName);
        validateInput(descriptor, input);
        return executor.execute(input);
    }

    public List<ToolDescriptor> getAllTools() {
        return new ArrayList<>(tools.values());
    }

    public List<Map<String, Object>> getOpenAIToolSchemas() {
        return tools.values().stream()
            .map(this::convertToOpenAISchema)
            .toList();
    }

    private void validateInput(ToolDescriptor descriptor, Map<String, Object> input) {
        JsonSchema schema = JsonSchemaFactory.getInstance()
            .getSchema(descriptor.getInputSchema());
        Set<ValidationMessage> errors = schema.validate(input);
        if (!errors.isEmpty()) {
            throw new ToolValidationException(
                "Input validation failed: " + errors);
        }
    }
}

ToolExecutor接口与实现

public interface ToolExecutor {
    ToolExecutionResult execute(Map<String, Object> input);
    String getName();
    String getDescription();
}

@Component
public class HttpToolExecutor implements ToolExecutor {

    private final RestTemplate restTemplate;

    @Override
    public ToolExecutionResult execute(Map<String, Object> input) {
        String url = (String) input.get("url");
        String method = (String) input.getOrDefault("method", "GET");
        Map<String, String> headers = (Map<String, String>) input.getOrDefault("headers", Map.of());
        Object body = input.get("body");

        HttpHeaders httpHeaders = new HttpHeaders();
        headers.forEach(httpHeaders::set);

        HttpEntity<Object> entity = new HttpEntity<>(body, httpHeaders);

        ResponseEntity<String> response = restTemplate.exchange(
            url, HttpMethod.valueOf(method), entity, String.class);

        return ToolExecutionResult.builder()
            .success(true)
            .data(response.getBody())
            .metadata(Map.of("statusCode", response.getStatusCode().value()))
            .build();
    }

    @Override
    public String getName() { return "http_request"; }

    @Override
    public String getDescription() { return "Execute HTTP request to external API"; }
}

@Component
public class DatabaseToolExecutor implements ToolExecutor {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public ToolExecutionResult execute(Map<String, Object> input) {
        String sql = (String) input.get("sql");
        List<Object> params = (List<Object>) input.getOrDefault("params", List.of());

        if (!isSafeQuery(sql)) {
            throw new SecurityException("Unsafe SQL query blocked: " + sql);
        }

        List<Map<String, Object>> results = jdbcTemplate.queryForList(sql, params.toArray());

        return ToolExecutionResult.builder()
            .success(true)
            .data(results)
            .build();
    }

    private boolean isSafeQuery(String sql) {
        String upper = sql.toUpperCase().trim();
        return upper.startsWith("SELECT") && !upper.contains("DROP")
            && !upper.contains("DELETE") && !upper.contains("UPDATE");
    }

    @Override
    public String getName() { return "database_query"; }

    @Override
    public String getDescription() { return "Execute read-only SQL query"; }
}

@Component
public class FileToolExecutor implements ToolExecutor {

    @Value("${tool.file.base-dir}")
    private String baseDir;

    @Override
    public ToolExecutionResult execute(Map<String, Object> input) {
        String operation = (String) input.get("operation");
        String path = (String) input.get("path");
        Path resolvedPath = Paths.get(baseDir, path).normalize();

        if (!resolvedPath.startsWith(Paths.get(baseDir))) {
            throw new SecurityException("Path traversal detected");
        }

        return switch (operation) {
            case "read" -> readFile(resolvedPath);
            case "list" -> listDirectory(resolvedPath);
            case "write" -> writeFile(resolvedPath, (String) input.get("content"));
            default -> throw new IllegalArgumentException("Unknown operation: " + operation);
        };
    }

    @Override
    public String getName() { return "file_operation"; }

    @Override
    public String getDescription() { return "Read, list, or write files within allowed directory"; }
}

工具注册配置

@Configuration
public class ToolRegistryConfig {

    @Bean
    public ToolRegistry toolRegistry(
            HttpToolExecutor httpTool,
            DatabaseToolExecutor dbTool,
            FileToolExecutor fileTool) {

        ToolRegistry registry = new ToolRegistry();

        registry.register(ToolDescriptor.builder()
            .name("http_request")
            .description("Execute HTTP request to external API")
            .inputSchema("""
                {
                  "type": "object",
                  "properties": {
                    "url": {"type": "string", "format": "uri"},
                    "method": {"type": "string", "enum": ["GET","POST","PUT","DELETE"]},
                    "headers": {"type": "object"},
                    "body": {"type": "object"}
                  },
                  "required": ["url"]
                }
                """)
            .build(), httpTool);

        registry.register(ToolDescriptor.builder()
            .name("database_query")
            .description("Execute read-only SQL query")
            .inputSchema("""
                {
                  "type": "object",
                  "properties": {
                    "sql": {"type": "string"},
                    "params": {"type": "array"}
                  },
                  "required": ["sql"]
                }
                """)
            .build(), dbTool);

        registry.register(ToolDescriptor.builder()
            .name("file_operation")
            .description("File operations within allowed directory")
            .inputSchema("""
                {
                  "type": "object",
                  "properties": {
                    "operation": {"type": "string", "enum": ["read","list","write"]},
                    "path": {"type": "string"},
                    "content": {"type": "string"}
                  },
                  "required": ["operation", "path"]
                }
                """)
            .build(), fileTool);

        return registry;
    }
}

状态机:Agent执行状态管理

状态定义与转换

                    ┌──────────┐
        创建        │  CREATED  │
       ────────►   └──────────┘
                         │ start
                         ▼
                    ┌──────────┐
                    │  RUNNING  │◄────────┐
                    └──────────┘         │
                    │          │ resume   │
              pause │          │ complete │
                    ▼          ▼          │
              ┌──────────┐  ┌──────────┐  │
              │  PAUSED   │  │COMPLETED │  │
              └──────────┘  └──────────┘  │
                    │                      │
                    │ resume               │
                    └──────────────────────┘
                         │ error
                         ▼
                    ┌──────────┐
                    │  FAILED   │
                    └──────────┘

Spring Statemachine配置

@Configuration
@EnableStateMachineFactory
public class WorkflowStateMachineConfig
        extends EnumStateMachineConfigurerAdapter<WorkflowState, WorkflowEvent> {

    @Override
    public void configure(StateMachineStateConfigurer<WorkflowState, WorkflowEvent> states)
            throws Exception {
        states
            .withStates()
            .initial(WorkflowState.CREATED)
            .states(EnumSet.allOf(WorkflowState.class))
            .end(WorkflowState.COMPLETED)
            .end(WorkflowState.FAILED);
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<WorkflowState, WorkflowEvent> transitions)
            throws Exception {
        transitions
            .withExternal()
                .source(WorkflowState.CREATED).target(WorkflowState.RUNNING)
                .event(WorkflowEvent.START)
            .and()
            .withExternal()
                .source(WorkflowState.RUNNING).target(WorkflowState.PAUSED)
                .event(WorkflowEvent.PAUSE)
            .and()
            .withExternal()
                .source(WorkflowState.PAUSED).target(WorkflowState.RUNNING)
                .event(WorkflowEvent.RESUME)
            .and()
            .withExternal()
                .source(WorkflowState.RUNNING).target(WorkflowState.COMPLETED)
                .event(WorkflowEvent.COMPLETE)
            .and()
            .withExternal()
                .source(WorkflowState.RUNNING).target(WorkflowState.FAILED)
                .event(WorkflowEvent.ERROR)
            .and()
            .withExternal()
                .source(WorkflowState.PAUSED).target(WorkflowState.FAILED)
                .event(WorkflowEvent.ERROR);
    }
}

public enum WorkflowState {
    CREATED, RUNNING, PAUSED, COMPLETED, FAILED
}

public enum WorkflowEvent {
    START, PAUSE, RESUME, COMPLETE, ERROR
}

Redis检查点持久化

@Component
public class RedisCheckpointManager implements CheckpointManager {

    private final RedisTemplate<String, String> redisTemplate;
    private final ObjectMapper objectMapper;
    private static final String KEY_PREFIX = "workflow:checkpoint:";

    @Override
    public void saveCheckpoint(WorkflowContext context) {
        String key = KEY_PREFIX + context.getExecutionId();
        try {
            String serialized = objectMapper.writeValueAsString(context);
            redisTemplate.opsForValue().set(key, serialized, Duration.ofHours(24));
            log.info("Checkpoint saved for execution: {}", context.getExecutionId());
        } catch (JsonProcessingException e) {
            throw new WorkflowException("Failed to save checkpoint", e);
        }
    }

    @Override
    public Optional<WorkflowContext> loadCheckpoint(String executionId) {
        String key = KEY_PREFIX + executionId;
        String serialized = redisTemplate.opsForValue().get(key);
        if (serialized == null) {
            return Optional.empty();
        }
        try {
            return Optional.of(objectMapper.readValue(serialized, WorkflowContext.class));
        } catch (JsonProcessingException e) {
            throw new WorkflowException("Failed to load checkpoint", e);
        }
    }

    @Override
    public void deleteCheckpoint(String executionId) {
        redisTemplate.delete(KEY_PREFIX + executionId);
    }
}

从检查点恢复执行

@Service
public class WorkflowRecoveryService {

    private final RedisCheckpointManager checkpointManager;
    private final DAGScheduler dagScheduler;
    private final WorkflowDefinitionRegistry definitionRegistry;

    public WorkflowResult recoverFromCheckpoint(String executionId) {
        WorkflowContext context = checkpointManager.loadCheckpoint(executionId)
            .orElseThrow(() -> new WorkflowException(
                "No checkpoint found for: " + executionId));

        WorkflowDefinition workflow = definitionRegistry.get(context.getWorkflowId());

        log.info("Recovering workflow {} from checkpoint, current state: {}",
            context.getWorkflowId(), context.getCurrentState());

        Set<String> completedNodes = context.getNodeResults().keySet();

        WorkflowDefinition remainingWorkflow = filterRemainingNodes(workflow, completedNodes);

        return dagScheduler.execute(remainingWorkflow, context).join();
    }

    private WorkflowDefinition filterRemainingNodes(
            WorkflowDefinition workflow, Set<String> completedNodes) {
        List<NodeDefinition> remainingNodes = workflow.getNodes().stream()
            .filter(n -> !completedNodes.contains(n.getNodeId()))
            .toList();

        List<EdgeDefinition> remainingEdges = workflow.getEdges().stream()
            .filter(e -> !completedNodes.contains(e.getSourceId()))
            .filter(e -> !completedNodes.contains(e.getTargetId()))
            .toList();

        return WorkflowDefinition.builder()
            .workflowId(workflow.getWorkflowId())
            .nodes(remainingNodes)
            .edges(remainingEdges)
            .build();
    }
}

Multi-Agent协作模式

四种协作模式对比

模式 适用场景 并发度 延迟 一致性 典型用例
Sequential 有严格依赖关系 1 审批流、流水线
Parallel 无依赖可并行 N 最终一致 多源数据聚合
Router 条件分支路由 1 意图分发、领域路由
Debate 需要多视角验证 N 投票/加权 代码审查、决策论证

协作模式接口

public interface AgentCollaborationMode {
    WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context);
    String getModeName();
}

Sequential模式

@Component
public class SequentialMode implements AgentCollaborationMode {

    private final AgentExecutor agentExecutor;

    @Override
    public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
        Object currentOutput = context.getVariable("input", Object.class);

        for (AgentDefinition agent : agents) {
            log.info("Sequential execution: agent={}, input type={}",
                agent.getName(), currentOutput.getClass().getSimpleName());

            AgentResult result = agentExecutor.execute(agent, currentOutput, context);
            currentOutput = result.getOutput();
            context.setVariable("agent_" + agent.getName() + "_output", currentOutput);
        }

        context.setVariable("output", currentOutput);
        return WorkflowResult.success(context);
    }

    @Override
    public String getModeName() { return "sequential"; }
}

Parallel模式

@Component
public class ParallelMode implements AgentCollaborationMode {

    private final AgentExecutor agentExecutor;

    @Override
    public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
        Object input = context.getVariable("input", Object.class);

        List<CompletableFuture<AgentResult>> futures = agents.stream()
            .map(agent -> CompletableFuture.supplyAsync(
                () -> agentExecutor.execute(agent, input, context)))
            .toList();

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        Map<String, Object> mergedOutput = new LinkedHashMap<>();
        for (int i = 0; i < agents.size(); i++) {
            AgentResult result = futures.get(i).join();
            mergedOutput.put(agents.get(i).getName(), result.getOutput());
        }

        context.setVariable("output", mergedOutput);
        return WorkflowResult.success(context);
    }

    @Override
    public String getModeName() { return "parallel"; }
}

Router模式

@Component
public class RouterMode implements AgentCollaborationMode {

    private final AgentExecutor agentExecutor;
    private final ChatClient chatClient;

    @Override
    public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
        Object input = context.getVariable("input", Object.class);

        String agentNames = agents.stream()
            .map(AgentDefinition::getName)
            .collect(Collectors.joining(", "));

        String routingPrompt = """
            Based on the input, select the most appropriate agent from: %s
            Input: %s
            Return ONLY the agent name, nothing else.
            """.formatted(agentNames, input);

        String selectedAgent = chatClient.prompt()
            .user(routingPrompt)
            .call()
            .content();

        AgentDefinition agent = agents.stream()
            .filter(a -> a.getName().equals(selectedAgent.trim()))
            .findFirst()
            .orElse(agents.get(0));

        log.info("Router selected agent: {}", agent.getName());

        AgentResult result = agentExecutor.execute(agent, input, context);
        context.setVariable("output", result.getOutput());
        return WorkflowResult.success(context);
    }

    @Override
    public String getModeName() { return "router"; }
}

Debate模式

@Component
public class DebateMode implements AgentCollaborationMode {

    private final AgentExecutor agentExecutor;

    @Value("${workflow.debate.rounds:3}")
    private int debateRounds;

    @Override
    public WorkflowResult execute(List<AgentDefinition> agents, WorkflowContext context) {
        Object topic = context.getVariable("input", Object.class);
        List<DebateRound> rounds = new ArrayList<>();

        for (int round = 0; round < debateRounds; round++) {
            log.info("Debate round {}/{}", round + 1, debateRounds);

            List<AgentResult> roundResults = agents.stream()
                .map(agent -> agentExecutor.execute(agent, topic, context))
                .toList();

            DebateRound debateRound = new DebateRound(round, roundResults);
            rounds.add(debateRound);

            topic = buildDebateContext(rounds);
        }

        Object finalDecision = voteOnResults(rounds);
        context.setVariable("debate_rounds", rounds);
        context.setVariable("output", finalDecision);
        return WorkflowResult.success(context);
    }

    private Object voteOnResults(List<DebateRound> rounds) {
        Map<String, Integer> voteCount = new HashMap<>();
        for (DebateRound round : rounds) {
            for (AgentResult result : round.results()) {
                String conclusion = result.getOutput().toString();
                voteCount.merge(conclusion, 1, Integer::sum);
            }
        }
        return voteCount.entrySet().stream()
            .max(Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .orElse("No consensus reached");
    }

    @Override
    public String getModeName() { return "debate"; }
}

与Spring AI深度集成

工作流DSL定义

@Component
public class CustomerServiceWorkflow {

    private final WorkflowEngine workflowEngine;
    private final ToolRegistry toolRegistry;

    public WorkflowDefinition build() {
        return workflowEngine.workflow("customer-service", "1.0")
            .node("classify_intent")
                .type("llm")
                .property("prompt", "判断用户意图:查询订单/退款/投诉/转人工")
                .property("model", "gpt-4o")
            .node("query_order")
                .type("tool")
                .property("toolName", "database_query")
                .property("sql", "SELECT * FROM orders WHERE order_id = :orderId")
            .node("query_knowledge")
                .type("tool")
                .property("toolName", "http_request")
                .property("url", "http://knowledge-service/api/search")
            .node("aggregate")
                .type("transform")
                .property("expression", "merge(orderData, knowledgeData)")
            .node("generate_response")
                .type("llm")
                .property("prompt", "基于以下信息生成客服回复:{{aggregatedData}}")
            .edge("classify_intent", "query_order")
                .condition("intent == 'query_order'")
            .edge("classify_intent", "query_knowledge")
                .condition("intent == 'knowledge_query'")
            .edge("query_order", "aggregate")
            .edge("query_knowledge", "aggregate")
            .edge("aggregate", "generate_response")
            .build();
    }
}

Spring AI ChatClient集成节点

@Component
public class LLMNodeExecutor implements NodeExecutor {

    private final ChatClient chatClient;
    private final ToolRegistry toolRegistry;

    @Override
    public CompletableFuture<NodeResult> execute(
            NodeDefinition node, WorkflowContext context) {

        String promptTemplate = (String) node.getProperties().get("prompt");
        String model = (String) node.getProperties().getOrDefault("model", "gpt-4o");
        boolean useTools = (boolean) node.getProperties().getOrDefault("useTools", false);

        String resolvedPrompt = resolveVariables(promptTemplate, context);

        ChatClient.ChatClientRequestSpec request = chatClient.prompt()
            .user(resolvedPrompt)
            .options(ChatOptionsBuilder.builder()
                .withModel(model)
                .withTemperature(0.1)
                .build());

        if (useTools) {
            List<Map<String, Object>> toolSchemas = toolRegistry.getOpenAIToolSchemas();
            request.tools(toolSchemas);
        }

        String response = request.call().content();

        NodeResult result = NodeResult.builder()
            .nodeId(node.getNodeId())
            .success(true)
            .output(response)
            .timestamp(Instant.now())
            .build();

        return CompletableFuture.completedFuture(result);
    }

    private String resolveVariables(String template, WorkflowContext context) {
        Pattern pattern = Pattern.compile("\\{\\{(\\w+)}}");
        Matcher matcher = pattern.matcher(template);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            Object value = context.getVariable(matcher.group(1), Object.class);
            matcher.appendReplacement(sb, value != null ? value.toString() : "");
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

    @Override
    public String getSupportedType() { return "llm"; }
}

REST Controller暴露工作流API

@RestController
@RequestMapping("/api/v1/workflows")
public class WorkflowController {

    private final WorkflowEngine workflowEngine;
    private final WorkflowRecoveryService recoveryService;

    @PostMapping("/{workflowId}/execute")
    public ResponseEntity<WorkflowExecutionResponse> executeWorkflow(
            @PathVariable String workflowId,
            @RequestBody WorkflowExecutionRequest request) {

        WorkflowContext context = WorkflowContext.builder()
            .workflowId(workflowId)
            .executionId(UUID.randomUUID().toString())
            .variables(request.getInputs())
            .build();

        CompletableFuture<WorkflowResult> future = workflowEngine.execute(workflowId, context);

        return ResponseEntity.accepted()
            .body(WorkflowExecutionResponse.builder()
                .executionId(context.getExecutionId())
                .status(WorkflowState.RUNNING)
                .build());
    }

    @GetMapping("/{workflowId}/executions/{executionId}")
    public ResponseEntity<WorkflowExecutionResponse> getExecutionStatus(
            @PathVariable String workflowId,
            @PathVariable String executionId) {

        Optional<WorkflowContext> checkpoint = recoveryService.getCheckpoint(executionId);

        return checkpoint.map(ctx -> ResponseEntity.ok(
                WorkflowExecutionResponse.builder()
                    .executionId(executionId)
                    .status(ctx.getCurrentState())
                    .nodeResults(ctx.getNodeResults())
                    .build()))
            .orElse(ResponseEntity.notFound().build());
    }

    @PostMapping("/{workflowId}/executions/{executionId}/resume")
    public ResponseEntity<WorkflowExecutionResponse> resumeWorkflow(
            @PathVariable String workflowId,
            @PathVariable String executionId) {

        WorkflowResult result = recoveryService.recoverFromCheckpoint(executionId);

        return ResponseEntity.ok(WorkflowExecutionResponse.builder()
            .executionId(executionId)
            .status(WorkflowState.COMPLETED)
            .build());
    }
}

工作流可视化与调试

执行追踪模型

public class ExecutionTrace {
    private String executionId;
    private List<NodeTrace> nodeTraces;
    private Instant startTime;
    private Instant endTime;
    private long totalDurationMs;
    private WorkflowState finalState;
}

public class NodeTrace {
    private String nodeId;
    private String nodeName;
    private Instant startTime;
    private Instant endTime;
    private long durationMs;
    private NodeStatus status;
    private String errorMessage;
    private Map<String, Object> inputSnapshot;
    private Map<String, Object> outputSnapshot;
    private List<ToolCallTrace> toolCalls;
}

public class ToolCallTrace {
    private String toolName;
    private Instant callTime;
    private long durationMs;
    private Map<String, Object> arguments;
    private Object result;
}

审计日志记录

@Aspect
@Component
public class WorkflowAuditAspect {

    private final AuditLogRepository auditLogRepository;

    @Around("execution(* com.toolsku.workflow..*Executor.execute(..))")
    public Object auditNodeExecution(ProceedingJoinPoint joinPoint) throws Throwable {
        NodeDefinition node = (NodeDefinition) joinPoint.getArgs()[0];
        WorkflowContext context = (WorkflowContext) joinPoint.getArgs()[1];

        AuditLog auditLog = AuditLog.builder()
            .executionId(context.getExecutionId())
            .nodeId(node.getNodeId())
            .nodeType(node.getType())
            .startTime(Instant.now())
            .build();

        try {
            Object result = joinPoint.proceed();
            auditLog.setStatus("SUCCESS");
            auditLog.setEndTime(Instant.now());
            return result;
        } catch (Exception e) {
            auditLog.setStatus("FAILED");
            auditLog.setErrorMessage(e.getMessage());
            auditLog.setEndTime(Instant.now());
            throw e;
        } finally {
            auditLogRepository.save(auditLog);
        }
    }
}

可视化数据生成

@Service
public class WorkflowVisualizationService {

    public WorkflowGraphData generateGraphData(WorkflowDefinition workflow, ExecutionTrace trace) {
        List<GraphNode> nodes = workflow.getNodes().stream()
            .map(nodeDef -> {
                NodeTrace nodeTrace = trace != null ?
                    trace.getNodeTraces().stream()
                        .filter(t -> t.getNodeId().equals(nodeDef.getNodeId()))
                        .findFirst().orElse(null) : null;

                return GraphNode.builder()
                    .id(nodeDef.getNodeId())
                    .label(nodeDef.getNodeId())
                    .type(nodeDef.getType())
                    .status(nodeTrace != null ? nodeTrace.getStatus() : NodeStatus.PENDING)
                    .durationMs(nodeTrace != null ? nodeTrace.getDurationMs() : 0)
                    .build();
            })
            .toList();

        List<GraphEdge> edges = workflow.getEdges().stream()
            .map(edgeDef -> GraphEdge.builder()
                .source(edgeDef.getSourceId())
                .target(edgeDef.getTargetId())
                .condition(edgeDef.getCondition())
                .type(edgeDef.getType())
                .build())
            .toList();

        return WorkflowGraphData.builder()
            .nodes(nodes)
            .edges(edges)
            .build();
    }
}

容错与重试机制

RetryPolicy定义

public class RetryPolicy {
    private int maxAttempts = 3;
    private Duration initialDelay = Duration.ofMillis(500);
    private double multiplier = 2.0;
    private Duration maxDelay = Duration.ofSeconds(30);
    private Set<Class<? extends Throwable>> retryableExceptions = Set.of(
        ToolExecutionException.class,
        ModelTimeoutException.class,
        RateLimitExceededException.class
    );
    private boolean exponentialBackoff = true;
}

RetryableNodeExecutor

@Component
public class RetryableNodeExecutor implements NodeExecutor {

    private final Map<String, NodeExecutor> executorMap;
    private final MetricsCollector metricsCollector;

    @Override
    public CompletableFuture<NodeResult> execute(
            NodeDefinition node, WorkflowContext context) {

        NodeExecutor delegate = executorMap.get(node.getType());
        RetryPolicy policy = node.getRetryPolicy() != null ?
            node.getRetryPolicy() : RetryPolicy.defaultPolicy();

        return CompletableFuture.supplyAsync(() -> {
            int attempt = 0;
            Throwable lastException = null;

            while (attempt < policy.getMaxAttempts()) {
                attempt++;
                try {
                    NodeResult result = delegate.execute(node, context).join();
                    metricsCollector.recordRetrySuccess(node.getNodeId(), attempt);
                    return result;
                } catch (Throwable e) {
                    lastException = e;
                    if (!isRetryable(e, policy)) {
                        throw new WorkflowExecutionException(
                            "Non-retryable exception", e);
                    }
                    if (attempt < policy.getMaxAttempts()) {
                        Duration delay = calculateDelay(attempt, policy);
                        log.warn("Node {} failed (attempt {}/{}), retrying in {}ms: {}",
                            node.getNodeId(), attempt, policy.getMaxAttempts(),
                            delay.toMillis(), e.getMessage());
                        sleep(delay);
                    }
                }
            }

            metricsCollector.recordRetryExhausted(node.getNodeId(), attempt);
            throw new WorkflowExecutionException(
                "Node " + node.getNodeId() + " failed after " + attempt + " attempts",
                lastException);
        });
    }

    private Duration calculateDelay(int attempt, RetryPolicy policy) {
        if (!policy.isExponentialBackoff()) {
            return policy.getInitialDelay();
        }
        long delayMs = (long) (policy.getInitialDelay().toMillis()
            * Math.pow(policy.getMultiplier(), attempt - 1));
        delayMs = Math.min(delayMs, policy.getMaxDelay().toMillis());
        return Duration.ofMillis(delayMs);
    }

    private boolean isRetryable(Throwable e, RetryPolicy policy) {
        return policy.getRetryableExceptions().stream()
            .anyMatch(exType -> exType.isInstance(e));
    }

    @Override
    public String getSupportedType() { return "retryable_wrapper"; }
}

CircuitBreaker集成

@Configuration
public class CircuitBreakerConfig {

    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .slowCallRateThreshold(80)
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .slowCallDurationThreshold(Duration.ofSeconds(5))
            .permittedNumberOfCallsInHalfOpenState(3)
            .slidingWindowSize(10)
            .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
            .build();

        return CircuitBreakerRegistry.of(config);
    }

    @Bean
    public CircuitBreaker llmCircuitBreaker(CircuitBreakerRegistry registry) {
        return registry.circuitBreaker("llm-calls");
    }
}

生产级部署与性能优化

监控指标清单

指标类别 指标名 类型 说明
执行 workflow_execution_total Counter 工作流执行总数
执行 workflow_execution_duration Histogram 执行耗时分布
执行 workflow_execution_active Gauge 当前活跃执行数
节点 node_execution_total Counter 节点执行总数
节点 node_execution_duration Histogram 节点耗时分布
节点 node_execution_errors Counter 节点错误数
工具 tool_call_total Counter 工具调用总数
工具 tool_call_duration Histogram 工具调用耗时
LLM llm_token_input Counter 输入Token消耗
LLM llm_token_output Counter 输出Token消耗
LLM llm_request_latency Histogram LLM请求延迟
重试 retry_attempts_total Counter 重试总次数
重试 retry_exhausted_total Counter 重试耗尽次数

Micrometer指标采集

@Component
public class WorkflowMetricsCollector {

    private final MeterRegistry meterRegistry;

    public void recordWorkflowExecution(String workflowId, Duration duration, boolean success) {
        Counter.builder("workflow.execution.total")
            .tag("workflow", workflowId)
            .tag("success", String.valueOf(success))
            .register(meterRegistry).increment();

        Timer.builder("workflow.execution.duration")
            .tag("workflow", workflowId)
            .register(meterRegistry)
            .record(duration);
    }

    public void recordNodeExecution(String nodeId, String nodeType,
                                     Duration duration, boolean success) {
        Counter.builder("node.execution.total")
            .tag("node", nodeId)
            .tag("type", nodeType)
            .tag("success", String.valueOf(success))
            .register(meterRegistry).increment();

        Timer.builder("node.execution.duration")
            .tag("node", nodeId)
            .tag("type", nodeType)
            .register(meterRegistry)
            .record(duration);
    }

    public void recordTokenUsage(String model, int inputTokens, int outputTokens) {
        Counter.builder("llm.token.input")
            .tag("model", model)
            .register(meterRegistry).increment(inputTokens);

        Counter.builder("llm.token.output")
            .tag("model", model)
            .register(meterRegistry).increment(outputTokens);
    }
}

性能优化清单

优化项 优化前 优化后 方法
LLM调用延迟 2.5s/次 0.8s/次 语义缓存(相同prompt命中缓存)
工作流启动 500ms 50ms 预编译DAG + 对象池
检查点保存 200ms 30ms 异步批量写入Redis Pipeline
节点间数据传递 序列化开销 零拷贝 引用传递 + Copy-on-Write
并行节点调度 顺序提交 真并行 ForkJoinPool + WorkStealing
工具Schema生成 每次反射 1次 启动时预生成 + 缓存

语义缓存实现

@Component
public class SemanticCache {

    private final EmbeddingModel embeddingModel;
    private final VectorStore vectorStore;

    @Value("${workflow.cache.similarity-threshold:0.95}")
    private double similarityThreshold;

    public Optional<String> get(String prompt) {
        float[] embedding = embeddingModel.embed(prompt);

        List<Document> results = vectorStore.similaritySearch(
            SearchRequest.builder()
                .query(prompt)
                .topK(1)
                .similarityThreshold(similarityThreshold)
                .build());

        if (!results.isEmpty()) {
            Document doc = results.get(0);
            log.info("Semantic cache hit for prompt (similarity: {})",
                doc.getMetadata().get("distance"));
            return Optional.of(doc.getContent());
        }

        return Optional.empty();
    }

    public void put(String prompt, String response) {
        float[] embedding = embeddingModel.embed(prompt);
        Document doc = new Document(prompt, response,
            Map.of("embedding", embedding, "timestamp", Instant.now().toString()));
        vectorStore.add(List.of(doc));
    }
}

Docker Compose部署配置

version: '3.8'
services:
  workflow-engine:
    image: toolsku/workflow-engine:1.0.0
    ports:
      - "8080:8080"
    environment:
      - SPRING_PROFILES_ACTIVE=production
      - SPRING_AI_OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_HOST=redis
      - POSTGRES_HOST=postgres
    depends_on:
      - redis
      - postgres
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '4'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"

  postgres:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_DB: workflow
      POSTGRES_USER: workflow
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    ports:
      - "5432:5432"

  prometheus:
    image: prom/prometheus
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"

总结:自建 vs 使用框架的决策矩阵

维度 自建引擎 LangGraph Spring AI Dify
DAG工作流 ✅ 完全控制
状态持久化 ✅ Redis ⚠️ 需自建
Multi-Agent ✅ 4种模式 ⚠️ 基础 ⚠️
Java生态 ✅ 原生 ❌ Python
可观测性 ✅ Micrometer ✅ LangSmith ⚠️
学习曲线
维护成本
定制灵活性 ✅ 最高

建议:

  • 中小企业/快速验证 → 用Dify或Spring AI,别重复造轮子
  • Java技术栈/需要深度定制 → 自建引擎,本文的架构可直接落地
  • Python团队/已有LangChain经验 → LangGraph,生态最成熟

自建引擎的核心价值不在于"比别人强",而在于"完全可控"——当你的Agent需要与内部微服务、遗留系统、定制中间件深度集成时,自建是唯一的选择。

本站提供浏览器本地工具,免注册即可试用 →

#AI Agent#工作流引擎#DAG调度#状态机#Function Calling#Multi-Agent#Java