AI Agent Workflow DAG Engine: 7 Production Patterns from Task Orchestration to Parallel Execution
Linear Agent Pipelines Are Dead — DAG Is the Ultimate Answer for AI Workflows
Still using input → process → output linear pipelines for your AI Agents? When a task requires 3 Agents researching in parallel, 2 Agents analyzing sequentially, and 1 Agent synthesizing — linear orchestration simply can't handle it. In 2026, AI Agent workflow DAG engines have become the standard for production systems: DAG (Directed Acyclic Graph) turns task dependencies, parallel scheduling, and conditional routing from hardcoded logic into declarative configuration.
Key Takeaways:
- Understand core DAG workflow engine concepts and architecture
- Master 7 production-grade DAG orchestration patterns, from task definition to monitoring
- Complete Python implementation ready for production use
- 5 common pitfalls with solutions, 10 error troubleshooting entries
- Comparison: Custom DAG vs LangGraph vs Prefect
Table of Contents
- DAG Workflow Core Concepts
- Pattern 1: Task Definition and Dependency Graph Construction
- Pattern 2: Topological Sort and Parallel Scheduling
- Pattern 3: Conditional Routing and Branch Merging
- Pattern 4: Error Recovery and Retry Strategies
- Pattern 5: State Persistence and Checkpoint Recovery
- Pattern 6: Dynamic DAG and Subgraph Nesting
- Pattern 7: Production Monitoring and Alerting
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Techniques
- Comparison: Custom DAG vs LangGraph vs Prefect
- Recommended Online Tools
- Summary
DAG Workflow Core Concepts
DAG (Directed Acyclic Graph) is the mathematical foundation of AI Agent workflow engines. Each node represents a task (Agent call, tool execution, data transformation), and each edge represents a dependency.
┌──────────────────────────────────────────────────────────────┐
│ DAG Workflow Engine Architecture │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ A │────▶│ B │────▶│ D │ ← Serial dependency │
│ └──┬──┘ └─────┘ └─────┘ │
│ │ ▲ │
│ │ ┌─────┐ │ │
│ └────▶│ C │──────────┘ ← B, C parallel; D waits │
│ └──┬──┘ │
│ │ ┌─────┐ │
│ └────────▶│ E │ ← Conditional: C→E or C→F │
│ └─────┘ │
│ ┌─────┐ │
│ │ F │ ← Alternative conditional branch │
│ └─────┘ │
│ │
│ Core Guarantees: │
│ 1. Acyclic — No A→B→C→A circular dependencies │
│ 2. Topological Order — At least one valid execution order │
│ 3. Parallelism — Independent nodes execute concurrently │
└──────────────────────────────────────────────────────────────┘
Key Terminology
| Term | Description |
|---|---|
| Node | Execution unit in the workflow (LLM call, tool execution, data transform) |
| Edge | Dependency between nodes; normal or conditional |
| DAG | Directed Acyclic Graph — nodes and edges with no cycles |
| Topological Sort | Algorithm to arrange DAG nodes into a valid execution sequence |
| Level | Nodes at the same topological level can execute in parallel |
| Checkpoint | Snapshot of workflow state for recovery |
| Conditional Routing | Dynamic selection of next node based on runtime state |
Why DAG Beats Linear Pipelines
| Dimension | Linear Pipeline | DAG Workflow |
|---|---|---|
| Parallel Execution | ❌ Serial only | ✅ Independent nodes run concurrently |
| Conditional Branching | ⚠️ Hardcoded if-else | ✅ Declarative conditional edges |
| Error Recovery | ❌ Restart from scratch | ✅ Checkpoint recovery |
| Visualization | ⚠️ Hard to understand | ✅ Graph structure is intuitive |
| Extensibility | ❌ Changes cascade | ✅ Local modifications, global safety |
Pattern 1: Task Definition and Dependency Graph Construction
The first step in building an AI Agent workflow DAG engine is defining task nodes and their dependencies. We implement a type-safe DAG definition system in Python.
Base Data Models
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
import hashlib
import json
class NodeType(Enum):
LLM_CALL = "llm_call"
TOOL_CALL = "tool_call"
TRANSFORM = "transform"
CONDITION = "condition"
PARALLEL_GROUP = "parallel_group"
SUB_WORKFLOW = "sub_workflow"
HUMAN_APPROVAL = "human_approval"
class EdgeType(Enum):
NORMAL = "normal"
CONDITIONAL = "conditional"
@dataclass
class RetryPolicy:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
backoff_factor: float = 2.0
retryable_exceptions: list[type[Exception]] = field(
default_factory=lambda: [Exception]
)
@dataclass
class NodeDefinition:
node_id: str
node_type: NodeType
handler: Callable[..., Any] | None = None
timeout_seconds: float = 300.0
retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
metadata: dict[str, Any] = field(default_factory=dict)
def __hash__(self):
return hash(self.node_id)
def __eq__(self, other):
if isinstance(other, NodeDefinition):
return self.node_id == other.node_id
return False
@dataclass
class EdgeDefinition:
source_id: str
target_id: str
edge_type: EdgeType = EdgeType.NORMAL
condition: Callable[..., bool] | None = None
condition_name: str = ""
def __hash__(self):
return hash((self.source_id, self.target_id, self.condition_name))
DAG Builder
class DAGBuilder:
def __init__(self, workflow_id: str, name: str = ""):
self.workflow_id = workflow_id
self.name = name
self._nodes: dict[str, NodeDefinition] = {}
self._edges: list[EdgeDefinition] = []
self._entry_node: str | None = None
def add_node(self, node: NodeDefinition) -> DAGBuilder:
if node.node_id in self._nodes:
raise ValueError(f"Node '{node.node_id}' already exists")
self._nodes[node.node_id] = node
return self
def add_edge(
self,
source_id: str,
target_id: str,
edge_type: EdgeType = EdgeType.NORMAL,
condition: Callable[..., bool] | None = None,
condition_name: str = "",
) -> DAGBuilder:
if source_id not in self._nodes:
raise ValueError(f"Source node '{source_id}' not found")
if target_id not in self._nodes:
raise ValueError(f"Target node '{target_id}' not found")
self._edges.append(
EdgeDefinition(
source_id=source_id,
target_id=target_id,
edge_type=edge_type,
condition=condition,
condition_name=condition_name,
)
)
return self
def set_entry(self, node_id: str) -> DAGBuilder:
if node_id not in self._nodes:
raise ValueError(f"Entry node '{node_id}' not found")
self._entry_node = node_id
return self
def build(self) -> DAGDefinition:
if not self._entry_node:
raise ValueError("Entry node not set")
dag = DAGDefinition(
workflow_id=self.workflow_id,
name=self.name,
nodes=dict(self._nodes),
edges=list(self._edges),
entry_node=self._entry_node,
)
dag.validate()
return dag
@dataclass
class DAGDefinition:
workflow_id: str
name: str
nodes: dict[str, NodeDefinition]
edges: list[EdgeDefinition]
entry_node: str
def validate(self):
self._check_cycle()
self._check_reachability()
def _check_cycle(self):
adjacency: dict[str, set[str]] = {
nid: set() for nid in self.nodes
}
for edge in self.edges:
adjacency[edge.source_id].add(edge.target_id)
visited: set[str] = set()
recursion_stack: set[str] = set()
def dfs(node_id: str) -> bool:
visited.add(node_id)
recursion_stack.add(node_id)
for neighbor in adjacency.get(node_id, set()):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in recursion_stack:
return True
recursion_stack.remove(node_id)
return False
for node_id in self.nodes:
if node_id not in visited:
if dfs(node_id):
raise ValueError(
f"Cycle detected in DAG '{self.workflow_id}'"
)
def _check_reachability(self):
reachable: set[str] = set()
stack = [self.entry_node]
while stack:
current = stack.pop()
if current in reachable:
continue
reachable.add(current)
for edge in self.edges:
if edge.source_id == current:
stack.append(edge.target_id)
unreachable = set(self.nodes.keys()) - reachable
if unreachable:
raise ValueError(
f"Unreachable nodes detected: {unreachable}"
)
def fingerprint(self) -> str:
data = {
"nodes": sorted(self.nodes.keys()),
"edges": [
{"s": e.source_id, "t": e.target_id, "c": e.condition_name}
for e in sorted(
self.edges,
key=lambda e: (e.source_id, e.target_id),
)
],
}
raw = json.dumps(data, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()[:12]
Example: Content Generation Workflow
def fetch_topic(state: dict) -> dict:
return {"topic": state.get("input", "AI technology trends")}
def research(state: dict) -> dict:
topic = state["topic"]
return {"research_data": f"Deep research data on {topic}..."}
def analyze(state: dict) -> dict:
data = state["research_data"]
return {"analysis": f"Analysis conclusions based on {data}..."}
def write_draft(state: dict) -> dict:
analysis = state["analysis"]
return {"draft": f"Draft content based on analysis {analysis}..."}
def review(state: dict) -> dict:
draft = state["draft"]
return {"review_result": "approved", "final_content": draft}
def needs_revision(state: dict) -> bool:
return state.get("review_result") == "needs_revision"
def is_approved(state: dict) -> bool:
return state.get("review_result") == "approved"
builder = (
DAGBuilder("content-gen-v1", "Content Generation Workflow")
.add_node(NodeDefinition("fetch", NodeType.TRANSFORM, handler=fetch_topic))
.add_node(NodeDefinition("research", NodeType.LLM_CALL, handler=research))
.add_node(NodeDefinition("analyze", NodeType.LLM_CALL, handler=analyze))
.add_node(NodeDefinition("write", NodeType.LLM_CALL, handler=write_draft))
.add_node(NodeDefinition("review", NodeType.LLM_CALL, handler=review))
.add_edge("fetch", "research")
.add_edge("research", "analyze")
.add_edge("analyze", "write")
.add_edge("write", "review")
.add_edge(
"review", "write",
EdgeType.CONDITIONAL,
condition=needs_revision,
condition_name="needs_revision",
)
.set_entry("fetch")
)
dag = builder.build()
print(f"DAG fingerprint: {dag.fingerprint()}")
Pattern 2: Topological Sort and Parallel Scheduling
The core scheduling capability of a DAG engine comes from topological sorting. After sorting, nodes at the same level have no mutual dependencies and can execute in parallel — this is the key to AI workflow engine performance.
Topological Sort and Level Computation
from collections import deque
class TopologicalSorter:
def __init__(self, dag: DAGDefinition):
self.dag = dag
self._adjacency: dict[str, set[str]] = {nid: set() for nid in dag.nodes}
self._in_degree: dict[str, int] = {nid: 0 for nid in dag.nodes}
for edge in dag.edges:
if edge.edge_type == EdgeType.NORMAL:
self._adjacency[edge.source_id].add(edge.target_id)
self._in_degree[edge.target_id] += 1
def sort(self) -> list[str]:
in_degree = dict(self._in_degree)
queue = deque(
nid for nid, deg in in_degree.items() if deg == 0
)
result = []
while queue:
node_id = queue.popleft()
result.append(node_id)
for neighbor in self._adjacency[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(self.dag.nodes):
raise ValueError("DAG contains a cycle (should have been caught in validation)")
return result
def compute_levels(self) -> dict[str, int]:
levels: dict[str, int] = {}
order = self.sort()
for node_id in order:
max_parent_level = -1
for edge in self.dag.edges:
if edge.target_id == node_id and edge.edge_type == EdgeType.NORMAL:
parent_level = levels.get(edge.source_id, 0)
max_parent_level = max(max_parent_level, parent_level)
levels[node_id] = max_parent_level + 1
return levels
def get_parallel_groups(self) -> list[list[str]]:
levels = self.compute_levels()
max_level = max(levels.values()) if levels else 0
groups: list[list[str]] = []
for level in range(max_level + 1):
group = [nid for nid, lvl in levels.items() if lvl == level]
if group:
groups.append(group)
return groups
Parallel Scheduler
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class NodeResult:
node_id: str
status: str
output: dict = field(default_factory=dict)
error: str | None = None
start_time: float = 0.0
end_time: float = 0.0
retry_count: int = 0
@dataclass
class WorkflowResult:
workflow_id: str
execution_id: str
status: str
state: dict = field(default_factory=dict)
node_results: dict[str, NodeResult] = field(default_factory=dict)
total_time: float = 0.0
class DAGScheduler:
def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
self.dag = dag
self.max_concurrency = max_concurrency
self._sorter = TopologicalSorter(dag)
self._semaphore = asyncio.Semaphore(max_concurrency)
async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
start_time = time.time()
parallel_groups = self._sorter.get_parallel_groups()
for group in parallel_groups:
tasks = [
self._execute_node(node_id, state, node_results)
for node_id in group
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = group[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id,
status="failed",
error=str(result),
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _execute_node(
self,
node_id: str,
state: dict,
node_results: dict[str, NodeResult],
) -> NodeResult:
node = self.dag.nodes[node_id]
start_time = time.time()
async with self._semaphore:
try:
if asyncio.iscoroutinefunction(node.handler):
output = await node.handler(state)
else:
output = await asyncio.to_thread(node.handler, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed",
output=output,
start_time=start_time,
end_time=time.time(),
)
except Exception as e:
return NodeResult(
node_id=node_id,
status="failed",
error=str(e),
start_time=start_time,
end_time=time.time(),
)
Execution Example
async def parallel_research_a(state: dict) -> dict:
await asyncio.sleep(0.1)
return {"research_a": "Technology trend research data"}
async def parallel_research_b(state: dict) -> dict:
await asyncio.sleep(0.1)
return {"research_b": "Market analysis research data"}
async def merge_research(state: dict) -> dict:
return {
"merged": f"{state.get('research_a', '')} + {state.get('research_b', '')}"
}
builder = (
DAGBuilder("parallel-research", "Parallel Research Workflow")
.add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
.add_node(NodeDefinition("research_a", NodeType.LLM_CALL, handler=parallel_research_a))
.add_node(NodeDefinition("research_b", NodeType.LLM_CALL, handler=parallel_research_b))
.add_node(NodeDefinition("merge", NodeType.TRANSFORM, handler=merge_research))
.add_edge("start", "research_a")
.add_edge("start", "research_b")
.add_edge("research_a", "merge")
.add_edge("research_b", "merge")
.set_entry("start")
)
dag = builder.build()
scheduler = DAGScheduler(dag)
result = await scheduler.execute({"input": "AI Agent Workflow DAG Engine"})
print(f"Status: {result.status}")
print(f"Total time: {result.total_time:.3f}s")
print(f"Parallel groups: {TopologicalSorter(dag).get_parallel_groups()}")
Pattern 3: Conditional Routing and Branch Merging
Real AI workflows don't follow a single path. Dynamically selecting execution paths based on Agent output, data quality, or user preferences is a core capability of DAG orchestration.
Conditional Router Implementation
class ConditionalRouter:
def __init__(self, dag: DAGDefinition):
self.dag = dag
self._conditional_edges: dict[str, list[EdgeDefinition]] = {}
for edge in dag.edges:
if edge.edge_type == EdgeType.CONDITIONAL:
self._conditional_edges.setdefault(edge.source_id, []).append(edge)
def resolve_next_nodes(
self, node_id: str, state: dict
) -> list[str]:
next_nodes: list[str] = []
for edge in self.dag.edges:
if edge.source_id != node_id:
continue
if edge.edge_type == EdgeType.NORMAL:
next_nodes.append(edge.target_id)
elif edge.edge_type == EdgeType.CONDITIONAL:
if edge.condition and edge.condition(state):
next_nodes.append(edge.target_id)
return next_nodes
def get_all_branches(self) -> dict[str, list[str]]:
branches: dict[str, list[str]] = {}
for source_id, edges in self._conditional_edges.items():
branches[source_id] = [
f"{e.condition_name} → {e.target_id}" for e in edges
]
return branches
Scheduler with Conditional Routing
class ConditionalDAGScheduler(DAGScheduler):
def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
super().__init__(dag, max_concurrency)
self._router = ConditionalRouter(dag)
async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
start_time = time.time()
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
while pending:
ready: list[str] = []
for node_id in list(pending):
deps = self._get_dependencies(node_id)
if deps.issubset(completed):
ready.append(node_id)
if not ready:
raise RuntimeError(
f"Deadlock detected. Pending: {pending}, Completed: {completed}"
)
tasks = [
self._execute_node(node_id, state, node_results)
for node_id in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
def _get_dependencies(self, node_id: str) -> set[str]:
deps: set[str] = set()
for edge in self.dag.edges:
if edge.target_id == node_id:
deps.add(edge.source_id)
return deps
Example: Smart Customer Service Routing
def classify_intent(state: dict) -> dict:
user_input = state.get("user_input", "")
if "refund" in user_input.lower():
return {"intent": "refund", "confidence": 0.95}
elif "technical" in user_input.lower() or "bug" in user_input.lower():
return {"intent": "technical", "confidence": 0.90}
else:
return {"intent": "general", "confidence": 0.80}
def handle_refund(state: dict) -> dict:
return {"response": "Refund process initiated, expected in 3-5 business days"}
def handle_technical(state: dict) -> dict:
return {"response": "Technical support team notified, response within 2 hours"}
def handle_general(state: dict) -> dict:
return {"response": "Thank you for your inquiry, a representative will assist you shortly"}
def is_refund(state: dict) -> bool:
return state.get("intent") == "refund"
def is_technical(state: dict) -> bool:
return state.get("intent") == "technical"
def is_general(state: dict) -> bool:
return state.get("intent") == "general"
builder = (
DAGBuilder("customer-service", "Smart Customer Service Routing")
.add_node(NodeDefinition("classify", NodeType.LLM_CALL, handler=classify_intent))
.add_node(NodeDefinition("refund_handler", NodeType.LLM_CALL, handler=handle_refund))
.add_node(NodeDefinition("tech_handler", NodeType.LLM_CALL, handler=handle_technical))
.add_node(NodeDefinition("general_handler", NodeType.LLM_CALL, handler=handle_general))
.add_node(NodeDefinition("respond", NodeType.TRANSFORM, handler=lambda s: {"final": s.get("response", "")}))
.add_edge("classify", "refund_handler", EdgeType.CONDITIONAL, is_refund, "is_refund")
.add_edge("classify", "tech_handler", EdgeType.CONDITIONAL, is_technical, "is_technical")
.add_edge("classify", "general_handler", EdgeType.CONDITIONAL, is_general, "is_general")
.add_edge("refund_handler", "respond")
.add_edge("tech_handler", "respond")
.add_edge("general_handler", "respond")
.set_entry("classify")
)
dag = builder.build()
scheduler = ConditionalDAGScheduler(dag)
result = await scheduler.execute({"user_input": "I need a refund, the product is defective"})
print(f"Response: {result.state.get('final', '')}")
Pattern 4: Error Recovery and Retry Strategies
In AI workflows, LLM calls and API requests can fail at any time. A DAG engine without retry and error recovery is unacceptable in production.
Retry Executor Implementation
import random
import logging
logger = logging.getLogger(__name__)
class RetryExecutor:
def __init__(self, retry_policy: RetryPolicy):
self.policy = retry_policy
async def execute_with_retry(
self,
handler: Callable[..., Any],
state: dict,
node_id: str,
) -> NodeResult:
last_error: Exception | None = None
retry_count = 0
for attempt in range(self.policy.max_retries + 1):
try:
if asyncio.iscoroutinefunction(handler):
output = await handler(state)
else:
output = await asyncio.to_thread(handler, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed",
output=output,
retry_count=retry_count,
)
except tuple(self.policy.retryable_exceptions) as e:
last_error = e
retry_count += 1
if attempt < self.policy.max_retries:
delay = min(
self.policy.base_delay
* (self.policy.backoff_factor ** attempt),
self.policy.max_delay,
)
jitter = random.uniform(0, delay * 0.1)
logger.warning(
f"Node '{node_id}' failed (attempt {attempt + 1}/"
f"{self.policy.max_retries + 1}), "
f"retrying in {delay + jitter:.2f}s: {e}"
)
await asyncio.sleep(delay + jitter)
except Exception as e:
last_error = e
break
return NodeResult(
node_id=node_id,
status="failed",
error=str(last_error),
retry_count=retry_count,
)
Scheduler with Retry and Fallback
class ResilientDAGScheduler(ConditionalDAGScheduler):
def __init__(
self,
dag: DAGDefinition,
max_concurrency: int = 10,
fallback_handlers: dict[str, Callable] | None = None,
):
super().__init__(dag, max_concurrency)
self._fallback_handlers = fallback_handlers or {}
async def _execute_node(
self,
node_id: str,
state: dict,
node_results: dict[str, NodeResult],
) -> NodeResult:
node = self.dag.nodes[node_id]
retry_executor = RetryExecutor(node.retry_policy)
result = await retry_executor.execute_with_retry(
node.handler, state, node_id
)
if result.status == "failed" and node_id in self._fallback_handlers:
logger.info(f"Node '{node_id}' failed, executing fallback handler")
try:
fallback = self._fallback_handlers[node_id]
if asyncio.iscoroutinefunction(fallback):
output = await fallback(state)
else:
output = await asyncio.to_thread(fallback, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed_with_fallback",
output=output,
retry_count=result.retry_count,
)
except Exception as fallback_error:
result.error = f"Primary: {result.error}; Fallback: {fallback_error}"
return result
Usage Example
async def call_llm_with_retry(state: dict) -> dict:
if random.random() < 0.5:
raise ConnectionError("LLM API timeout")
return {"llm_response": "Analysis results..."}
def fallback_llm(state: dict) -> dict:
return {"llm_response": "Fallback: using cached results"}
builder = (
DAGBuilder("resilient-workflow", "Resilient Workflow")
.add_node(
NodeDefinition(
"llm_call",
NodeType.LLM_CALL,
handler=call_llm_with_retry,
retry_policy=RetryPolicy(
max_retries=3,
base_delay=0.5,
retryable_exceptions=[ConnectionError, TimeoutError],
),
)
)
.set_entry("llm_call")
)
dag = builder.build()
scheduler = ResilientDAGScheduler(
dag,
fallback_handlers={"llm_call": fallback_llm},
)
result = await scheduler.execute({"input": "test"})
print(f"Status: {result.status}")
Pattern 5: State Persistence and Checkpoint Recovery
Long-running AI workflows (multi-round Agent collaboration, large-scale data processing) must support state persistence. When a workflow crashes mid-execution, you need to recover from the checkpoint instead of starting over.
Checkpoint Manager
import json
from pathlib import Path
from datetime import datetime
class CheckpointManager:
def __init__(self, storage_dir: str = ".checkpoints"):
self._storage = Path(storage_dir)
self._storage.mkdir(parents=True, exist_ok=True)
def save(
self,
workflow_id: str,
execution_id: str,
state: dict,
completed_nodes: set[str],
pending_nodes: set[str],
node_results: dict[str, NodeResult],
) -> str:
checkpoint_id = f"cp-{int(time.time() * 1000)}"
checkpoint_data = {
"checkpoint_id": checkpoint_id,
"workflow_id": workflow_id,
"execution_id": execution_id,
"state": state,
"completed_nodes": list(completed_nodes),
"pending_nodes": list(pending_nodes),
"node_results": {
nid: {
"node_id": r.node_id,
"status": r.status,
"output": r.output,
"error": r.error,
"retry_count": r.retry_count,
}
for nid, r in node_results.items()
},
"saved_at": datetime.now().isoformat(),
}
filepath = self._storage / f"{workflow_id}_{execution_id}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
return checkpoint_id
def load(
self, workflow_id: str, execution_id: str
) -> dict | None:
filepath = self._storage / f"{workflow_id}_{execution_id}.json"
if not filepath.exists():
return None
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
def list_checkpoints(self, workflow_id: str) -> list[dict]:
checkpoints = []
for fp in self._storage.glob(f"{workflow_id}_*.json"):
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
checkpoints.append({
"execution_id": data["execution_id"],
"saved_at": data["saved_at"],
"completed": len(data["completed_nodes"]),
"pending": len(data["pending_nodes"]),
})
return sorted(checkpoints, key=lambda x: x["saved_at"], reverse=True)
def cleanup(self, workflow_id: str, keep_last: int = 5):
checkpoints = self.list_checkpoints(workflow_id)
for cp in checkpoints[keep_last:]:
filepath = self._storage / f"{workflow_id}_{cp['execution_id']}.json"
filepath.unlink(missing_ok=True)
Scheduler with Checkpoint Recovery
class PersistentDAGScheduler(ResilientDAGScheduler):
def __init__(
self,
dag: DAGDefinition,
checkpoint_manager: CheckpointManager,
max_concurrency: int = 10,
checkpoint_interval: int = 1,
fallback_handlers: dict[str, Callable] | None = None,
):
super().__init__(dag, max_concurrency, fallback_handlers)
self._checkpoint_mgr = checkpoint_manager
self._checkpoint_interval = checkpoint_interval
async def execute(
self,
initial_state: dict | None = None,
execution_id: str | None = None,
) -> WorkflowResult:
if execution_id:
return await self._resume(execution_id, initial_state)
return await self._run_from_start(initial_state)
async def _run_from_start(
self, initial_state: dict | None = None
) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
start_time = time.time()
steps_since_checkpoint = 0
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
raise RuntimeError("Deadlock in DAG execution")
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
steps_since_checkpoint += 1
if steps_since_checkpoint >= self._checkpoint_interval:
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
steps_since_checkpoint = 0
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _resume(
self,
execution_id: str,
initial_state: dict | None = None,
) -> WorkflowResult:
checkpoint = self._checkpoint_mgr.load(
self.dag.workflow_id, execution_id
)
if not checkpoint:
raise ValueError(
f"No checkpoint found for {self.dag.workflow_id}/{execution_id}"
)
state = checkpoint["state"]
if initial_state:
state.update(initial_state)
completed = set(checkpoint["completed_nodes"])
pending = set(checkpoint["pending_nodes"])
node_results = {
nid: NodeResult(
node_id=r["node_id"],
status=r["status"],
output=r["output"],
error=r.get("error"),
retry_count=r.get("retry_count", 0),
)
for nid, r in checkpoint["node_results"].items()
}
failed_nodes = {
nid for nid, r in node_results.items() if r.status == "failed"
}
pending.update(failed_nodes)
start_time = time.time()
steps_since_checkpoint = 0
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
break
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
steps_since_checkpoint += 1
if steps_since_checkpoint >= self._checkpoint_interval:
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
steps_since_checkpoint = 0
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
Pattern 6: Dynamic DAG and Subgraph Nesting
In production, DAGs aren't static. Dynamically generating subtasks based on runtime data and nesting sub-workflows are key capabilities for advanced orchestration.
Dynamic DAG Generation
class DynamicDAGGenerator:
def __init__(self, base_dag: DAGDefinition):
self.base_dag = base_dag
def generate_dynamic_nodes(
self,
state: dict,
dynamic_node_factory: Callable[[dict], list[NodeDefinition]],
dependency_resolver: Callable[[list[NodeDefinition], dict], list[EdgeDefinition]],
) -> tuple[list[NodeDefinition], list[EdgeDefinition]]:
new_nodes = dynamic_node_factory(state)
new_edges = dependency_resolver(new_nodes, state)
return new_nodes, new_edges
def merge_into_base(
self,
new_nodes: list[NodeDefinition],
new_edges: list[EdgeDefinition],
attach_after: str,
) -> DAGDefinition:
builder = DAGBuilder(
f"{self.base_dag.workflow_id}-dynamic",
f"{self.base_dag.name} (dynamic)",
)
for node in self.base_dag.nodes.values():
builder.add_node(node)
for edge in self.base_dag.edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
for node in new_nodes:
builder.add_node(node)
for edge in new_edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
builder.set_entry(self.base_dag.entry_node)
return builder.build()
Subgraph Nesting
class SubWorkflowNode:
def __init__(
self,
sub_dag: DAGDefinition,
scheduler_class: type = ConditionalDAGScheduler,
max_concurrency: int = 5,
):
self.sub_dag = sub_dag
self._scheduler_class = scheduler_class
self._max_concurrency = max_concurrency
async def execute(self, state: dict) -> dict:
scheduler = self._scheduler_class(
self.sub_dag, max_concurrency=self._max_concurrency
)
result = await scheduler.execute(state)
if result.status != "completed":
raise RuntimeError(
f"Sub-workflow '{self.sub_dag.workflow_id}' failed: "
f"{[r.error for r in result.node_results.values() if r.error]}"
)
return result.state
def create_sub_workflow_node(
node_id: str,
sub_dag: DAGDefinition,
max_concurrency: int = 5,
) -> NodeDefinition:
sub_executor = SubWorkflowNode(sub_dag, max_concurrency=max_concurrency)
return NodeDefinition(
node_id=node_id,
node_type=NodeType.SUB_WORKFLOW,
handler=sub_executor.execute,
metadata={"sub_workflow_id": sub_dag.workflow_id},
)
Example: Multi-Source Data Collection
def create_data_source_nodes(state: dict) -> list[NodeDefinition]:
sources = state.get("data_sources", ["api", "database", "file"])
nodes = []
for source in sources:
async def fetch_data(s: dict, src=source) -> dict:
await asyncio.sleep(0.1)
return {f"{src}_data": f"Data from {src}"}
nodes.append(
NodeDefinition(
f"fetch_{source}",
NodeType.TOOL_CALL,
handler=fetch_data,
)
)
return nodes
def resolve_dynamic_edges(
new_nodes: list[NodeDefinition], state: dict
) -> list[EdgeDefinition]:
edges = []
for node in new_nodes:
edges.append(EdgeDefinition(source_id="start", target_id=node.node_id))
edges.append(EdgeDefinition(source_id=node.node_id, target_id="aggregate"))
return edges
sub_builder = (
DAGBuilder("data-collection", "Data Collection Subgraph")
.add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
.add_node(NodeDefinition("aggregate", NodeType.TRANSFORM, handler=lambda s: {"aggregated": "all data merged"}))
.set_entry("start")
)
sub_dag = sub_builder.build()
builder = (
DAGBuilder("main-workflow", "Main Workflow")
.add_node(NodeDefinition("plan", NodeType.LLM_CALL, handler=lambda s: {**s, "data_sources": ["api", "database", "file"]}))
.add_node(create_sub_workflow_node("collect", sub_dag))
.add_node(NodeDefinition("report", NodeType.LLM_CALL, handler=lambda s: {"report": "Final report"}))
.add_edge("plan", "collect")
.add_edge("collect", "report")
.set_entry("plan")
)
main_dag = builder.build()
scheduler = ConditionalDAGScheduler(main_dag)
result = await scheduler.execute({"input": "Generate data collection report"})
Pattern 7: Production Monitoring and Alerting
Once an AI Agent workflow DAG engine goes live, monitoring is the lifeline of operations. You need to know each node's execution time, success rate, and error distribution.
Metrics Collector
from collections import defaultdict
import statistics
@dataclass
class NodeMetrics:
node_id: str
total_executions: int = 0
success_count: int = 0
failure_count: int = 0
fallback_count: int = 0
total_retry_count: int = 0
execution_times: list[float] = field(default_factory=list)
@property
def success_rate(self) -> float:
if self.total_executions == 0:
return 0.0
return self.success_count / self.total_executions
@property
def avg_execution_time(self) -> float:
if not self.execution_times:
return 0.0
return statistics.mean(self.execution_times)
@property
def p95_execution_time(self) -> float:
if len(self.execution_times) < 2:
return self.avg_execution_time
sorted_times = sorted(self.execution_times)
idx = int(len(sorted_times) * 0.95)
return sorted_times[min(idx, len(sorted_times) - 1)]
@property
def p99_execution_time(self) -> float:
if len(self.execution_times) < 2:
return self.avg_execution_time
sorted_times = sorted(self.execution_times)
idx = int(len(sorted_times) * 0.99)
return sorted_times[min(idx, len(sorted_times) - 1)]
class MetricsCollector:
def __init__(self):
self._node_metrics: dict[str, NodeMetrics] = defaultdict(
lambda: NodeMetrics(node_id="")
)
self._workflow_count = 0
self._workflow_success = 0
self._workflow_failure = 0
def record_node_result(self, result: NodeResult):
metrics = self._node_metrics[result.node_id]
metrics.node_id = result.node_id
metrics.total_executions += 1
metrics.total_retry_count += result.retry_count
if result.status == "completed":
metrics.success_count += 1
elif result.status == "completed_with_fallback":
metrics.fallback_count += 1
metrics.success_count += 1
else:
metrics.failure_count += 1
exec_time = result.end_time - result.start_time
if exec_time > 0:
metrics.execution_times.append(exec_time)
def record_workflow_result(self, result: WorkflowResult):
self._workflow_count += 1
if result.status == "completed":
self._workflow_success += 1
else:
self._workflow_failure += 1
for node_result in result.node_results.values():
self.record_node_result(node_result)
def get_node_metrics(self, node_id: str) -> NodeMetrics | None:
return self._node_metrics.get(node_id)
def get_all_metrics(self) -> dict[str, NodeMetrics]:
return dict(self._node_metrics)
def summary(self) -> dict:
return {
"total_workflows": self._workflow_count,
"success_workflows": self._workflow_success,
"failed_workflows": self._workflow_failure,
"workflow_success_rate": (
self._workflow_success / self._workflow_count
if self._workflow_count > 0
else 0.0
),
"nodes": {
nid: {
"success_rate": f"{m.success_rate:.2%}",
"avg_time": f"{m.avg_execution_time:.3f}s",
"p95_time": f"{m.p95_execution_time:.3f}s",
"p99_time": f"{m.p99_execution_time:.3f}s",
"total_retries": m.total_retry_count,
"fallback_count": m.fallback_count,
}
for nid, m in self._node_metrics.items()
},
}
Alert Rules
class AlertRule:
def __init__(
self,
name: str,
condition: Callable[[NodeMetrics], bool],
severity: str = "warning",
message_template: str = "",
):
self.name = name
self.condition = condition
self.severity = severity
self.message_template = message_template
def check(self, metrics: NodeMetrics) -> str | None:
if self.condition(metrics):
return self.message_template.format(
node_id=metrics.node_id,
success_rate=f"{metrics.success_rate:.2%}",
avg_time=f"{metrics.avg_execution_time:.3f}s",
)
return None
class AlertManager:
def __init__(self):
self._rules: list[AlertRule] = []
self._alerts: list[dict] = []
def add_rule(self, rule: AlertRule):
self._rules.append(rule)
def check_metrics(self, metrics_collector: MetricsCollector):
for node_id, metrics in metrics_collector.get_all_metrics().items():
for rule in self._rules:
alert_msg = rule.check(metrics)
if alert_msg:
self._alerts.append({
"rule": rule.name,
"severity": rule.severity,
"node_id": node_id,
"message": alert_msg,
"timestamp": datetime.now().isoformat(),
})
def get_alerts(self, severity: str | None = None) -> list[dict]:
if severity:
return [a for a in self._alerts if a["severity"] == severity]
return list(self._alerts)
alert_mgr = AlertManager()
alert_mgr.add_rule(AlertRule(
name="low_success_rate",
condition=lambda m: m.total_executions >= 5 and m.success_rate < 0.8,
severity="critical",
message_template="Node {node_id} success rate {success_rate} below 80%",
))
alert_mgr.add_rule(AlertRule(
name="high_latency",
condition=lambda m: m.avg_execution_time > 30.0,
severity="warning",
message_template="Node {node_id} avg execution time {avg_time} exceeds 30s",
))
alert_mgr.add_rule(AlertRule(
name="high_retry_rate",
condition=lambda m: m.total_executions > 0
and m.total_retry_count / m.total_executions > 2.0,
severity="warning",
message_template="Node {node_id} has high retry rate, avg retries per execution > 2",
))
5 Common Pitfalls and Solutions
Pitfall 1: Missing Cycle Detection for Conditional Edges
Conditional edges are only activated at runtime, so static cycle detection may miss runtime cycles.
def validate_conditional_cycles(dag: DAGDefinition):
all_edges = list(dag.edges)
for edge in all_edges:
if edge.edge_type == EdgeType.CONDITIONAL:
test_edges = [
e for e in all_edges
if not (e.source_id == edge.source_id
and e.target_id == edge.target_id
and e.edge_type == EdgeType.CONDITIONAL)
]
test_edges.append(EdgeDefinition(
source_id=edge.source_id,
target_id=edge.target_id,
edge_type=EdgeType.NORMAL,
))
test_dag = DAGDefinition(
workflow_id=dag.workflow_id + "-test",
name=dag.name,
nodes=dag.nodes,
edges=test_edges,
entry_node=dag.entry_node,
)
try:
test_dag._check_cycle()
except ValueError:
raise ValueError(
f"Conditional edge '{edge.source_id}' → '{edge.target_id}' "
f"may create a runtime cycle"
)
Solution: Run cycle detection for all conditional edges assuming they're activated, ensuring no combination of conditions creates a loop.
Pitfall 2: Parallel Node Write Conflicts
Multiple parallel nodes modifying the same key in state cause data overwrites.
def validate_parallel_write_safety(dag: DAGDefinition):
levels = TopologicalSorter(dag).compute_levels()
level_groups: dict[int, list[str]] = {}
for nid, level in levels.items():
level_groups.setdefault(level, []).append(nid)
for level, nodes in level_groups.items():
if len(nodes) <= 1:
continue
output_keys: dict[str, list[str]] = {}
for nid in nodes:
node = dag.nodes[nid]
keys = node.metadata.get("output_keys", [])
for key in keys:
output_keys.setdefault(key, []).append(nid)
conflicts = {k: v for k, v in output_keys.items() if len(v) > 1}
if conflicts:
raise ValueError(
f"Parallel write conflict at level {level}: {conflicts}"
)
Solution: Check for output key conflicts among parallel nodes during DAG validation, or use namespace isolation.
Pitfall 3: Checkpoint Serialization Failures
State contains non-serializable objects (database connections, file handles), causing checkpoint save failures.
import pickle
def safe_serialize_state(state: dict) -> bytes:
try:
return pickle.dumps(state)
except (pickle.PicklingError, TypeError) as e:
clean_state = {}
for key, value in state.items():
try:
pickle.dumps(value)
clean_state[key] = value
except (pickle.PicklingError, TypeError):
clean_state[key] = f"<non-serializable: {type(value).__name__}>"
return pickle.dumps(clean_state)
Solution: Only return JSON-serializable data from handlers, or use a custom serializer.
Pitfall 4: No Matching Branch in Conditional Routing
All conditional edge conditions return False, causing the workflow to stall.
def ensure_default_branch(dag: DAGDefinition) -> DAGDefinition:
conditional_sources = set()
for edge in dag.edges:
if edge.edge_type == EdgeType.CONDITIONAL:
conditional_sources.add(edge.source_id)
builder = DAGBuilder(
f"{dag.workflow_id}-safe", f"{dag.name} (safe)"
)
for node in dag.nodes.values():
builder.add_node(node)
for edge in dag.edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
for source_id in conditional_sources:
has_normal = any(
e.source_id == source_id and e.edge_type == EdgeType.NORMAL
for e in dag.edges
)
if not has_normal:
builder.add_node(
NodeDefinition(
f"{source_id}_default",
NodeType.TRANSFORM,
handler=lambda s: {"routed_to_default": True},
)
)
builder.add_edge(source_id, f"{source_id}_default")
builder.set_entry(dag.entry_node)
return builder.build()
Solution: Add a default branch for every conditional routing node to ensure at least one path is always executable.
Pitfall 5: Subgraph State Leakage
Sub-workflows modify the parent workflow's state, causing unexpected side effects.
def isolate_sub_workflow_state(
parent_state: dict, sub_workflow_input_keys: list[str]
) -> tuple[dict, Callable[[dict], dict]]:
isolated = {k: parent_state[k] for k in sub_workflow_input_keys if k in parent_state}
def merge_back(sub_state: dict) -> dict:
output_keys = set(sub_workflow_input_keys)
return {k: v for k, v in sub_state.items() if k not in output_keys}
return isolated, merge_back
Solution: Only pass necessary keys to sub-workflows, and only merge new keys on return.
10 Common Error Troubleshooting
| # | Error Message | Cause | Solution |
|---|---|---|---|
| 1 | Cycle detected in DAG |
Circular dependency between nodes | Check Edge definitions, remove cycle-forming edges |
| 2 | Unreachable nodes detected |
Node has no path from entry | Check for missing Edge connections |
| 3 | Entry node not found |
set_entry references non-existent node | Verify node_id spelling |
| 4 | Source/Target node not found |
add_edge references non-existent node | add_node before add_edge |
| 5 | Deadlock detected |
No matching conditional branch and no default | Add default branch or check condition functions |
| 6 | Node failed after N retries |
LLM API consistently timing out or erroring | Check API Key, network, fallback strategy |
| 7 | Sub-workflow failed |
Internal sub-workflow node failure | Check sub-workflow node_results for specifics |
| 8 | Checkpoint serialization error |
State contains non-serializable objects | Handlers should only return dict[str, Any] |
| 9 | Parallel write conflict |
Parallel nodes output to same key | Use namespace isolation for output keys |
| 10 | Runtime cycle via conditional edge |
Conditional edges form a loop at runtime | Use validate_conditional_cycles to check |
Advanced Optimization Techniques
1. Async Prefetch: Pre-load Dependencies for Next Level
class PrefetchScheduler(PersistentDAGScheduler):
async def _run_from_start(self, initial_state=None):
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
start_time = time.time()
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
break
prefetch_tasks = []
for nid in ready:
node = self.dag.nodes[nid]
if node.node_type == NodeType.LLM_CALL:
prefetch_tasks.append(
asyncio.create_task(self._warmup_llm(nid))
)
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
if prefetch_tasks:
await asyncio.gather(*prefetch_tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _warmup_llm(self, node_id: str):
logger.info(f"Warming up LLM connection for node '{node_id}'")
await asyncio.sleep(0.01)
2. Timeout Circuit Breaker: Prevent Slow Nodes from Dragging Down the Workflow
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_max: int = 1,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self._failure_count = 0
self._last_failure_time: float = 0
self._state = "closed"
self._half_open_count = 0
async def call(self, handler: Callable, state: dict) -> dict:
if self._state == "open":
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = "half_open"
self._half_open_count = 0
else:
raise RuntimeError("Circuit breaker is OPEN")
try:
result = await handler(state) if asyncio.iscoroutinefunction(handler) else await asyncio.to_thread(handler, state)
if self._state == "half_open":
self._half_open_count += 1
if self._half_open_count >= self.half_open_max:
self._state = "closed"
self._failure_count = 0
return result
except Exception as e:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = "open"
raise
3. DAG Visualization: Auto-generate Mermaid Diagrams
def dag_to_mermaid(dag: DAGDefinition) -> str:
lines = ["graph TD"]
for edge in dag.edges:
style = ""
if edge.edge_type == EdgeType.CONDITIONAL:
style = f"|{edge.condition_name}|"
lines.append(f" {edge.source_id} -->{style} {edge.target_id}")
for nid, node in dag.nodes.items():
label = f"{nid}\\n({node.node_type.value})"
lines.append(f" {nid}[\"{label}\"]")
return "\n".join(lines)
Use the Mermaid Editor to render DAG visualization directly.
Comparison: Custom DAG vs LangGraph vs Prefect
| Dimension | Custom DAG Engine | LangGraph | Prefect |
|---|---|---|---|
| Language | Python (extensible) | Python | Python |
| DAG Definition | Declarative Builder | StateGraph | Flow + Task |
| Parallel Execution | ✅ Auto level-based | ✅ Based on asyncio | ✅ Native Dask/Ray |
| Conditional Routing | ✅ Conditional edges | ✅ conditional_edges | ✅ branch |
| State Persistence | ✅ CheckpointManager | ✅ Checkpointer | ✅ Result + Storage |
| Checkpoint Recovery | ✅ Native support | ✅ Requires config | ⚠️ DIY |
| Error Recovery | ✅ Retry + fallback | ⚠️ DIY | ✅ Native retry |
| LLM Integration | ⚠️ DIY | ✅ LangChain ecosystem | ⚠️ DIY |
| Visualization | ✅ Mermaid export | ✅ LangGraph Studio | ✅ Prefect UI |
| Learning Curve | Medium | Medium | Low |
| Production Monitoring | ✅ Custom Metrics | ⚠️ LangSmith | ✅ Prefect Cloud |
| Dynamic DAG | ✅ Runtime generation | ✅ Command | ✅ Dynamic tasks |
| Subgraph Nesting | ✅ SubWorkflowNode | ✅ Subgraph | ⚠️ Sub-Flow |
| Community | ❌ Self-maintained | ✅ Active | ✅ Active |
| Best For | Highly customized needs | LangChain users | General task orchestration |
Selection Guide:
- Custom DAG Engine: Deep customization, tight integration with existing systems, extreme performance requirements
- LangGraph: Already in the LangChain ecosystem, rapid prototyping, LLM-native support
- Prefect: General task orchestration, mixed non-LLM workflows, out-of-the-box UI
For more on LangGraph multi-Agent collaboration, see Python LangGraph Multi-Agent Collaboration. For Agent memory architecture, see AI Agent Memory Architecture. For Agent tool usage, see Python AI Agent Tool Use Guide.
Recommended Online Tools
| Tool | Purpose | Link |
|---|---|---|
| JSON Formatter | View and edit DAG definition JSON | /en/json/format |
| Mermaid Editor | Visualize DAG workflow diagrams | /en/dev/mermaid |
| Curl to Code | Quickly generate API call code | /en/dev/curl-to-code |
Summary
The AI Agent workflow DAG engine is the core infrastructure of production-grade AI systems in 2026. This article covered 7 production patterns:
- Task Definition and Dependency Graph Construction — Type-safe DAG Builder with automatic cycle detection and reachability validation
- Topological Sort and Parallel Scheduling — Level-based automatic parallel execution with asyncio concurrency control
- Conditional Routing and Branch Merging — Declarative conditional edges with runtime dynamic routing
- Error Recovery and Retry Strategies — Exponential backoff retry, fallback handlers, circuit breakers
- State Persistence and Checkpoint Recovery — Checkpoint mechanism for crash recovery
- Dynamic DAG and Subgraph Nesting — Runtime subtask generation, sub-workflow encapsulation and reuse
- Production Monitoring and Alerting — Node-level metrics collection, success rate/latency monitoring, alert rules
Core Principle: DAG evolves AI workflows from "hardcoded pipelines" to "declarative orchestration" — it's the essential path from prototype to production for Agent systems.
More AI Agent practical content:
- Python LangGraph Multi-Agent Collaboration: 5 Practical Patterns from State Machines to Workflow Orchestration
- AI Agent Memory Architecture: 4-Layer System from Short-Term Memory to Long-Term Knowledge
- Python AI Agent Tool Use: Complete Guide from Function Calling to MCP Protocol
External References:
Try these browser-local tools — no sign-up required →