AIエージェントワークフローDAGエンジン:タスクオーケストレーションから並列実行までの7つの本番パターン
リニアエージェントパイプラインは終わった、DAGこそがAIワークフローの究極の答え
まだ input → process → output のリニアパイプラインでAIエージェントを構築している?3つのエージェントが並列で調査し、2つのエージェントが直列で分析し、最後に1つのエージェントが統合する——そんなタスクはリニアオーケストレーションでは対応できない。2026年、AIエージェントワークフローDAGエンジンは本番システムの標準装備となっている:DAG(有向非巡回グラフ)により、タスク依存、並列スケジューリング、条件ルーティングがハードコードから宣言的設定へと進化した。
本記事の主な収穫:
- DAGワークフローエンジンのコア概念とアーキテクチャを理解
- タスク定義から監視アラートまで7つの本番級DAGオーケストレーションパターンを習得
- 本番環境でそのまま使える完全なPython実装
- 5つのよくある落とし穴と解決策、10のエラートラブルシューティング
- カスタムDAG vs LangGraph vs Prefectの比較分析
目次
- DAGワークフローコア概念
- パターン1:タスク定義と依存グラフ構築
- パターン2:トポロジカルソートと並列スケジューリング
- パターン3:条件ルーティングと分岐マージ
- パターン4:エラー回復とリトライ戦略
- パターン5:状態永続化とチェックポイントリカバリ
- パターン6:動的DAGとサブグラフネスト
- パターン7:本番監視とアラート
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析:カスタムDAG vs LangGraph vs Prefect
- オンラインツール推奨
- まとめ
DAGワークフローコア概念
DAG(Directed Acyclic Graph、有向非巡回グラフ)はAIエージェントワークフローエンジンの数学的基盤である。各ノードはタスク(エージェント呼び出し、ツール実行、データ変換)を表し、各エッジは依存関係を表す。
┌──────────────────────────────────────────────────────────────┐
│ DAGワークフローエンジンアーキテクチャ │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ A │────▶│ B │────▶│ D │ ← 直列依存チェーン │
│ └──┬──┘ └─────┘ └─────┘ │
│ │ ▲ │
│ │ ┌─────┐ │ │
│ └────▶│ C │──────────┘ ← B、C並列、Dは両方を待機 │
│ └──┬──┘ │
│ │ ┌─────┐ │
│ └────────▶│ E │ ← 条件ルーティング:C→E或C→F│
│ └─────┘ │
│ ┌─────┐ │
│ │ F │ ← 条件分岐のもう一つのパス │
│ └─────┘ │
│ │
│ コア保証: │
│ 1. 非巡回 — A→B→C→A の循環依存は存在しない │
│ 2. トポロジカル順序 — 少なくとも1つの正当な実行順序が存在 │
│ 3. 並列度 — 依存関係のないノードは同時実行可能 │
└──────────────────────────────────────────────────────────────┘
主要用語
| 用語 | 説明 |
|---|---|
| Node(ノード) | ワークフロー内の実行単位(LLM呼び出し、ツール実行、データ変換) |
| Edge(エッジ) | ノード間の依存関係。通常エッジと条件エッジに分かれる |
| DAG(有向非巡回グラフ) | ノードとエッジからなるグラフ構造。循環なしを保証 |
| トポロジカルソート | DAGノードを正当な実行順序に並べるアルゴリズム |
| レベル | トポロジカルソート後、同じレベルのノードは並列実行可能 |
| Checkpoint | ワークフロー実行状態のスナップショット。リカバリに使用 |
| 条件ルーティング | 実行時状態に基づき動的に次の実行ノードを選択 |
なぜDAGはリニアパイプラインより優れているのか?
| 側面 | リニアパイプライン | DAGワークフロー |
|---|---|---|
| 並列実行 | ❌ 直列のみ | ✅ 依存のないノードが並列実行 |
| 条件分岐 | ⚠️ ハードコードif-else | ✅ 宣言的条件エッジ |
| エラー回復 | ❌ 最初からやり直し | ✅ チェックポイントリカバリ |
| 可視化 | ⚠️ 理解が困難 | ✅ グラフ構造が直感的 |
| 拡張性 | ❌ 変更が全体に波及 | ✅ 局所的変更、全体の安全性 |
パターン1:タスク定義と依存グラフ構築
AIエージェントワークフローDAGエンジンの第一歩は、タスクノードと依存関係の定義である。Pythonで型安全なDAG定義システムを実装する。
基本データモデル
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
import hashlib
import json
class NodeType(Enum):
LLM_CALL = "llm_call"
TOOL_CALL = "tool_call"
TRANSFORM = "transform"
CONDITION = "condition"
PARALLEL_GROUP = "parallel_group"
SUB_WORKFLOW = "sub_workflow"
HUMAN_APPROVAL = "human_approval"
class EdgeType(Enum):
NORMAL = "normal"
CONDITIONAL = "conditional"
@dataclass
class RetryPolicy:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
backoff_factor: float = 2.0
retryable_exceptions: list[type[Exception]] = field(
default_factory=lambda: [Exception]
)
@dataclass
class NodeDefinition:
node_id: str
node_type: NodeType
handler: Callable[..., Any] | None = None
timeout_seconds: float = 300.0
retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
metadata: dict[str, Any] = field(default_factory=dict)
def __hash__(self):
return hash(self.node_id)
def __eq__(self, other):
if isinstance(other, NodeDefinition):
return self.node_id == other.node_id
return False
@dataclass
class EdgeDefinition:
source_id: str
target_id: str
edge_type: EdgeType = EdgeType.NORMAL
condition: Callable[..., bool] | None = None
condition_name: str = ""
def __hash__(self):
return hash((self.source_id, self.target_id, self.condition_name))
DAGビルダー
class DAGBuilder:
def __init__(self, workflow_id: str, name: str = ""):
self.workflow_id = workflow_id
self.name = name
self._nodes: dict[str, NodeDefinition] = {}
self._edges: list[EdgeDefinition] = []
self._entry_node: str | None = None
def add_node(self, node: NodeDefinition) -> DAGBuilder:
if node.node_id in self._nodes:
raise ValueError(f"Node '{node.node_id}' already exists")
self._nodes[node.node_id] = node
return self
def add_edge(
self,
source_id: str,
target_id: str,
edge_type: EdgeType = EdgeType.NORMAL,
condition: Callable[..., bool] | None = None,
condition_name: str = "",
) -> DAGBuilder:
if source_id not in self._nodes:
raise ValueError(f"Source node '{source_id}' not found")
if target_id not in self._nodes:
raise ValueError(f"Target node '{target_id}' not found")
self._edges.append(
EdgeDefinition(
source_id=source_id,
target_id=target_id,
edge_type=edge_type,
condition=condition,
condition_name=condition_name,
)
)
return self
def set_entry(self, node_id: str) -> DAGBuilder:
if node_id not in self._nodes:
raise ValueError(f"Entry node '{node_id}' not found")
self._entry_node = node_id
return self
def build(self) -> DAGDefinition:
if not self._entry_node:
raise ValueError("Entry node not set")
dag = DAGDefinition(
workflow_id=self.workflow_id,
name=self.name,
nodes=dict(self._nodes),
edges=list(self._edges),
entry_node=self._entry_node,
)
dag.validate()
return dag
@dataclass
class DAGDefinition:
workflow_id: str
name: str
nodes: dict[str, NodeDefinition]
edges: list[EdgeDefinition]
entry_node: str
def validate(self):
self._check_cycle()
self._check_reachability()
def _check_cycle(self):
adjacency: dict[str, set[str]] = {
nid: set() for nid in self.nodes
}
for edge in self.edges:
adjacency[edge.source_id].add(edge.target_id)
visited: set[str] = set()
recursion_stack: set[str] = set()
def dfs(node_id: str) -> bool:
visited.add(node_id)
recursion_stack.add(node_id)
for neighbor in adjacency.get(node_id, set()):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in recursion_stack:
return True
recursion_stack.remove(node_id)
return False
for node_id in self.nodes:
if node_id not in visited:
if dfs(node_id):
raise ValueError(
f"Cycle detected in DAG '{self.workflow_id}'"
)
def _check_reachability(self):
reachable: set[str] = set()
stack = [self.entry_node]
while stack:
current = stack.pop()
if current in reachable:
continue
reachable.add(current)
for edge in self.edges:
if edge.source_id == current:
stack.append(edge.target_id)
unreachable = set(self.nodes.keys()) - reachable
if unreachable:
raise ValueError(
f"Unreachable nodes detected: {unreachable}"
)
def fingerprint(self) -> str:
data = {
"nodes": sorted(self.nodes.keys()),
"edges": [
{"s": e.source_id, "t": e.target_id, "c": e.condition_name}
for e in sorted(
self.edges,
key=lambda e: (e.source_id, e.target_id),
)
],
}
raw = json.dumps(data, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()[:12]
構築例:コンテンツ生成ワークフロー
def fetch_topic(state: dict) -> dict:
return {"topic": state.get("input", "AI技術トレンド")}
def research(state: dict) -> dict:
topic = state["topic"]
return {"research_data": f"{topic}に関する深い研究データ..."}
def analyze(state: dict) -> dict:
data = state["research_data"]
return {"analysis": f"{data}に基づく分析結論..."}
def write_draft(state: dict) -> dict:
analysis = state["analysis"]
return {"draft": f"分析{analysis}に基づく下書き内容..."}
def review(state: dict) -> dict:
draft = state["draft"]
return {"review_result": "approved", "final_content": draft}
def needs_revision(state: dict) -> bool:
return state.get("review_result") == "needs_revision"
def is_approved(state: dict) -> bool:
return state.get("review_result") == "approved"
builder = (
DAGBuilder("content-gen-v1", "コンテンツ生成ワークフロー")
.add_node(NodeDefinition("fetch", NodeType.TRANSFORM, handler=fetch_topic))
.add_node(NodeDefinition("research", NodeType.LLM_CALL, handler=research))
.add_node(NodeDefinition("analyze", NodeType.LLM_CALL, handler=analyze))
.add_node(NodeDefinition("write", NodeType.LLM_CALL, handler=write_draft))
.add_node(NodeDefinition("review", NodeType.LLM_CALL, handler=review))
.add_edge("fetch", "research")
.add_edge("research", "analyze")
.add_edge("analyze", "write")
.add_edge("write", "review")
.add_edge(
"review", "write",
EdgeType.CONDITIONAL,
condition=needs_revision,
condition_name="needs_revision",
)
.set_entry("fetch")
)
dag = builder.build()
print(f"DAG fingerprint: {dag.fingerprint()}")
パターン2:トポロジカルソートと並列スケジューリング
DAGエンジンのコアスケジューリング能力はトポロジカルソートに由来する。ソート後、同じレベルのノードには相互依存がなく、並列実行が可能——これがAIワークフローエンジンのパフォーマンスの鍵である。
トポロジカルソートとレベル計算
from collections import deque
class TopologicalSorter:
def __init__(self, dag: DAGDefinition):
self.dag = dag
self._adjacency: dict[str, set[str]] = {nid: set() for nid in dag.nodes}
self._in_degree: dict[str, int] = {nid: 0 for nid in dag.nodes}
for edge in dag.edges:
if edge.edge_type == EdgeType.NORMAL:
self._adjacency[edge.source_id].add(edge.target_id)
self._in_degree[edge.target_id] += 1
def sort(self) -> list[str]:
in_degree = dict(self._in_degree)
queue = deque(
nid for nid, deg in in_degree.items() if deg == 0
)
result = []
while queue:
node_id = queue.popleft()
result.append(node_id)
for neighbor in self._adjacency[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(self.dag.nodes):
raise ValueError("DAG contains a cycle (should have been caught in validation)")
return result
def compute_levels(self) -> dict[str, int]:
levels: dict[str, int] = {}
order = self.sort()
for node_id in order:
max_parent_level = -1
for edge in self.dag.edges:
if edge.target_id == node_id and edge.edge_type == EdgeType.NORMAL:
parent_level = levels.get(edge.source_id, 0)
max_parent_level = max(max_parent_level, parent_level)
levels[node_id] = max_parent_level + 1
return levels
def get_parallel_groups(self) -> list[list[str]]:
levels = self.compute_levels()
max_level = max(levels.values()) if levels else 0
groups: list[list[str]] = []
for level in range(max_level + 1):
group = [nid for nid, lvl in levels.items() if lvl == level]
if group:
groups.append(group)
return groups
並列スケジューラ
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class NodeResult:
node_id: str
status: str
output: dict = field(default_factory=dict)
error: str | None = None
start_time: float = 0.0
end_time: float = 0.0
retry_count: int = 0
@dataclass
class WorkflowResult:
workflow_id: str
execution_id: str
status: str
state: dict = field(default_factory=dict)
node_results: dict[str, NodeResult] = field(default_factory=dict)
total_time: float = 0.0
class DAGScheduler:
def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
self.dag = dag
self.max_concurrency = max_concurrency
self._sorter = TopologicalSorter(dag)
self._semaphore = asyncio.Semaphore(max_concurrency)
async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
start_time = time.time()
parallel_groups = self._sorter.get_parallel_groups()
for group in parallel_groups:
tasks = [
self._execute_node(node_id, state, node_results)
for node_id in group
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = group[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id,
status="failed",
error=str(result),
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _execute_node(
self,
node_id: str,
state: dict,
node_results: dict[str, NodeResult],
) -> NodeResult:
node = self.dag.nodes[node_id]
start_time = time.time()
async with self._semaphore:
try:
if asyncio.iscoroutinefunction(node.handler):
output = await node.handler(state)
else:
output = await asyncio.to_thread(node.handler, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed",
output=output,
start_time=start_time,
end_time=time.time(),
)
except Exception as e:
return NodeResult(
node_id=node_id,
status="failed",
error=str(e),
start_time=start_time,
end_time=time.time(),
)
実行例
async def parallel_research_a(state: dict) -> dict:
await asyncio.sleep(0.1)
return {"research_a": "技術トレンド研究データ"}
async def parallel_research_b(state: dict) -> dict:
await asyncio.sleep(0.1)
return {"research_b": "市場分析研究データ"}
async def merge_research(state: dict) -> dict:
return {
"merged": f"{state.get('research_a', '')} + {state.get('research_b', '')}"
}
builder = (
DAGBuilder("parallel-research", "並列研究ワークフロー")
.add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
.add_node(NodeDefinition("research_a", NodeType.LLM_CALL, handler=parallel_research_a))
.add_node(NodeDefinition("research_b", NodeType.LLM_CALL, handler=parallel_research_b))
.add_node(NodeDefinition("merge", NodeType.TRANSFORM, handler=merge_research))
.add_edge("start", "research_a")
.add_edge("start", "research_b")
.add_edge("research_a", "merge")
.add_edge("research_b", "merge")
.set_entry("start")
)
dag = builder.build()
scheduler = DAGScheduler(dag)
result = await scheduler.execute({"input": "AIエージェントワークフローDAGエンジン"})
print(f"Status: {result.status}")
print(f"Total time: {result.total_time:.3f}s")
print(f"Parallel groups: {TopologicalSorter(dag).get_parallel_groups()}")
パターン3:条件ルーティングと分岐マージ
実際のAIワークフローは単一パスではない。エージェント出力、データ品質、ユーザー設定に基づいて動的に実行パスを選択することが、DAGオーケストレーションの中核能力である。
条件ルーターの実装
class ConditionalRouter:
def __init__(self, dag: DAGDefinition):
self.dag = dag
self._conditional_edges: dict[str, list[EdgeDefinition]] = {}
for edge in dag.edges:
if edge.edge_type == EdgeType.CONDITIONAL:
self._conditional_edges.setdefault(edge.source_id, []).append(edge)
def resolve_next_nodes(
self, node_id: str, state: dict
) -> list[str]:
next_nodes: list[str] = []
for edge in self.dag.edges:
if edge.source_id != node_id:
continue
if edge.edge_type == EdgeType.NORMAL:
next_nodes.append(edge.target_id)
elif edge.edge_type == EdgeType.CONDITIONAL:
if edge.condition and edge.condition(state):
next_nodes.append(edge.target_id)
return next_nodes
def get_all_branches(self) -> dict[str, list[str]]:
branches: dict[str, list[str]] = {}
for source_id, edges in self._conditional_edges.items():
branches[source_id] = [
f"{e.condition_name} → {e.target_id}" for e in edges
]
return branches
条件ルーティング付きスケジューラ
class ConditionalDAGScheduler(DAGScheduler):
def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
super().__init__(dag, max_concurrency)
self._router = ConditionalRouter(dag)
async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
start_time = time.time()
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
while pending:
ready: list[str] = []
for node_id in list(pending):
deps = self._get_dependencies(node_id)
if deps.issubset(completed):
ready.append(node_id)
if not ready:
raise RuntimeError(
f"Deadlock detected. Pending: {pending}, Completed: {completed}"
)
tasks = [
self._execute_node(node_id, state, node_results)
for node_id in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
def _get_dependencies(self, node_id: str) -> set[str]:
deps: set[str] = set()
for edge in self.dag.edges:
if edge.target_id == node_id:
deps.add(edge.source_id)
return deps
例:スマートカスタマーサービスルーティング
def classify_intent(state: dict) -> dict:
user_input = state.get("user_input", "")
if "返金" in user_input:
return {"intent": "refund", "confidence": 0.95}
elif "技術" in user_input or "bug" in user_input.lower():
return {"intent": "technical", "confidence": 0.90}
else:
return {"intent": "general", "confidence": 0.80}
def handle_refund(state: dict) -> dict:
return {"response": "返金プロセスを開始しました。3-5営業日で着金予定です"}
def handle_technical(state: dict) -> dict:
return {"response": "テクニカルサポートチームが問題を確認しました。2時間以内に対応します"}
def handle_general(state: dict) -> dict:
return {"response": "お問い合わせありがとうございます。担当者がまもなくご案内します"}
def is_refund(state: dict) -> bool:
return state.get("intent") == "refund"
def is_technical(state: dict) -> bool:
return state.get("intent") == "technical"
def is_general(state: dict) -> bool:
return state.get("intent") == "general"
builder = (
DAGBuilder("customer-service", "スマートカスタマーサービスルーティング")
.add_node(NodeDefinition("classify", NodeType.LLM_CALL, handler=classify_intent))
.add_node(NodeDefinition("refund_handler", NodeType.LLM_CALL, handler=handle_refund))
.add_node(NodeDefinition("tech_handler", NodeType.LLM_CALL, handler=handle_technical))
.add_node(NodeDefinition("general_handler", NodeType.LLM_CALL, handler=handle_general))
.add_node(NodeDefinition("respond", NodeType.TRANSFORM, handler=lambda s: {"final": s.get("response", "")}))
.add_edge("classify", "refund_handler", EdgeType.CONDITIONAL, is_refund, "is_refund")
.add_edge("classify", "tech_handler", EdgeType.CONDITIONAL, is_technical, "is_technical")
.add_edge("classify", "general_handler", EdgeType.CONDITIONAL, is_general, "is_general")
.add_edge("refund_handler", "respond")
.add_edge("tech_handler", "respond")
.add_edge("general_handler", "respond")
.set_entry("classify")
)
dag = builder.build()
scheduler = ConditionalDAGScheduler(dag)
result = await scheduler.execute({"user_input": "返金したいです。商品に欠陥があります"})
print(f"Response: {result.state.get('final', '')}")
パターン4:エラー回復とリトライ戦略
AIワークフローでは、LLM呼び出しやAPIリクエストがいつでも失敗する可能性がある。リトライとエラー回復のないDAGエンジンは本番環境では受け入れられない。
リトライエグゼキューターの実装
import random
import logging
logger = logging.getLogger(__name__)
class RetryExecutor:
def __init__(self, retry_policy: RetryPolicy):
self.policy = retry_policy
async def execute_with_retry(
self,
handler: Callable[..., Any],
state: dict,
node_id: str,
) -> NodeResult:
last_error: Exception | None = None
retry_count = 0
for attempt in range(self.policy.max_retries + 1):
try:
if asyncio.iscoroutinefunction(handler):
output = await handler(state)
else:
output = await asyncio.to_thread(handler, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed",
output=output,
retry_count=retry_count,
)
except tuple(self.policy.retryable_exceptions) as e:
last_error = e
retry_count += 1
if attempt < self.policy.max_retries:
delay = min(
self.policy.base_delay
* (self.policy.backoff_factor ** attempt),
self.policy.max_delay,
)
jitter = random.uniform(0, delay * 0.1)
logger.warning(
f"Node '{node_id}' failed (attempt {attempt + 1}/"
f"{self.policy.max_retries + 1}), "
f"retrying in {delay + jitter:.2f}s: {e}"
)
await asyncio.sleep(delay + jitter)
except Exception as e:
last_error = e
break
return NodeResult(
node_id=node_id,
status="failed",
error=str(last_error),
retry_count=retry_count,
)
リトライとフォールバック付きスケジューラ
class ResilientDAGScheduler(ConditionalDAGScheduler):
def __init__(
self,
dag: DAGDefinition,
max_concurrency: int = 10,
fallback_handlers: dict[str, Callable] | None = None,
):
super().__init__(dag, max_concurrency)
self._fallback_handlers = fallback_handlers or {}
async def _execute_node(
self,
node_id: str,
state: dict,
node_results: dict[str, NodeResult],
) -> NodeResult:
node = self.dag.nodes[node_id]
retry_executor = RetryExecutor(node.retry_policy)
result = await retry_executor.execute_with_retry(
node.handler, state, node_id
)
if result.status == "failed" and node_id in self._fallback_handlers:
logger.info(f"Node '{node_id}' failed, executing fallback handler")
try:
fallback = self._fallback_handlers[node_id]
if asyncio.iscoroutinefunction(fallback):
output = await fallback(state)
else:
output = await asyncio.to_thread(fallback, state)
if not isinstance(output, dict):
output = {"result": output}
return NodeResult(
node_id=node_id,
status="completed_with_fallback",
output=output,
retry_count=result.retry_count,
)
except Exception as fallback_error:
result.error = f"Primary: {result.error}; Fallback: {fallback_error}"
return result
使用例
async def call_llm_with_retry(state: dict) -> dict:
if random.random() < 0.5:
raise ConnectionError("LLM API timeout")
return {"llm_response": "分析結果..."}
def fallback_llm(state: dict) -> dict:
return {"llm_response": "フォールバック:キャッシュ結果を使用"}
builder = (
DAGBuilder("resilient-workflow", "耐障害ワークフロー")
.add_node(
NodeDefinition(
"llm_call",
NodeType.LLM_CALL,
handler=call_llm_with_retry,
retry_policy=RetryPolicy(
max_retries=3,
base_delay=0.5,
retryable_exceptions=[ConnectionError, TimeoutError],
),
)
)
.set_entry("llm_call")
)
dag = builder.build()
scheduler = ResilientDAGScheduler(
dag,
fallback_handlers={"llm_call": fallback_llm},
)
result = await scheduler.execute({"input": "test"})
print(f"Status: {result.status}")
パターン5:状態永続化とチェックポイントリカバリ
長時間実行されるAIワークフロー(マルチラウンドエージェント協調、大規模データ処理など)では状態永続化が必須である。実行途中でクラッシュしてもチェックポイントからリカバリでき、最初からやり直す必要がない。
チェックポイントマネージャー
import json
from pathlib import Path
from datetime import datetime
class CheckpointManager:
def __init__(self, storage_dir: str = ".checkpoints"):
self._storage = Path(storage_dir)
self._storage.mkdir(parents=True, exist_ok=True)
def save(
self,
workflow_id: str,
execution_id: str,
state: dict,
completed_nodes: set[str],
pending_nodes: set[str],
node_results: dict[str, NodeResult],
) -> str:
checkpoint_id = f"cp-{int(time.time() * 1000)}"
checkpoint_data = {
"checkpoint_id": checkpoint_id,
"workflow_id": workflow_id,
"execution_id": execution_id,
"state": state,
"completed_nodes": list(completed_nodes),
"pending_nodes": list(pending_nodes),
"node_results": {
nid: {
"node_id": r.node_id,
"status": r.status,
"output": r.output,
"error": r.error,
"retry_count": r.retry_count,
}
for nid, r in node_results.items()
},
"saved_at": datetime.now().isoformat(),
}
filepath = self._storage / f"{workflow_id}_{execution_id}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
return checkpoint_id
def load(
self, workflow_id: str, execution_id: str
) -> dict | None:
filepath = self._storage / f"{workflow_id}_{execution_id}.json"
if not filepath.exists():
return None
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
def list_checkpoints(self, workflow_id: str) -> list[dict]:
checkpoints = []
for fp in self._storage.glob(f"{workflow_id}_*.json"):
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
checkpoints.append({
"execution_id": data["execution_id"],
"saved_at": data["saved_at"],
"completed": len(data["completed_nodes"]),
"pending": len(data["pending_nodes"]),
})
return sorted(checkpoints, key=lambda x: x["saved_at"], reverse=True)
def cleanup(self, workflow_id: str, keep_last: int = 5):
checkpoints = self.list_checkpoints(workflow_id)
for cp in checkpoints[keep_last:]:
filepath = self._storage / f"{workflow_id}_{cp['execution_id']}.json"
filepath.unlink(missing_ok=True)
チェックポイントリカバリ対応スケジューラ
class PersistentDAGScheduler(ResilientDAGScheduler):
def __init__(
self,
dag: DAGDefinition,
checkpoint_manager: CheckpointManager,
max_concurrency: int = 10,
checkpoint_interval: int = 1,
fallback_handlers: dict[str, Callable] | None = None,
):
super().__init__(dag, max_concurrency, fallback_handlers)
self._checkpoint_mgr = checkpoint_manager
self._checkpoint_interval = checkpoint_interval
async def execute(
self,
initial_state: dict | None = None,
execution_id: str | None = None,
) -> WorkflowResult:
if execution_id:
return await self._resume(execution_id, initial_state)
return await self._run_from_start(initial_state)
async def _run_from_start(
self, initial_state: dict | None = None
) -> WorkflowResult:
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
start_time = time.time()
steps_since_checkpoint = 0
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
raise RuntimeError("Deadlock in DAG execution")
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
steps_since_checkpoint += 1
if steps_since_checkpoint >= self._checkpoint_interval:
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
steps_since_checkpoint = 0
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _resume(
self,
execution_id: str,
initial_state: dict | None = None,
) -> WorkflowResult:
checkpoint = self._checkpoint_mgr.load(
self.dag.workflow_id, execution_id
)
if not checkpoint:
raise ValueError(
f"No checkpoint found for {self.dag.workflow_id}/{execution_id}"
)
state = checkpoint["state"]
if initial_state:
state.update(initial_state)
completed = set(checkpoint["completed_nodes"])
pending = set(checkpoint["pending_nodes"])
node_results = {
nid: NodeResult(
node_id=r["node_id"],
status=r["status"],
output=r["output"],
error=r.get("error"),
retry_count=r.get("retry_count", 0),
)
for nid, r in checkpoint["node_results"].items()
}
failed_nodes = {
nid for nid, r in node_results.items() if r.status == "failed"
}
pending.update(failed_nodes)
start_time = time.time()
steps_since_checkpoint = 0
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
break
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
steps_since_checkpoint += 1
if steps_since_checkpoint >= self._checkpoint_interval:
self._checkpoint_mgr.save(
self.dag.workflow_id, execution_id,
state, completed, pending, node_results,
)
steps_since_checkpoint = 0
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
パターン6:動的DAGとサブグラフネスト
本番環境ではDAGは静的ではない。実行時データに基づいて動的にサブタスクを生成し、サブワークフローをネストすることが高度なオーケストレーションの鍵となる。
動的DAG生成
class DynamicDAGGenerator:
def __init__(self, base_dag: DAGDefinition):
self.base_dag = base_dag
def generate_dynamic_nodes(
self,
state: dict,
dynamic_node_factory: Callable[[dict], list[NodeDefinition]],
dependency_resolver: Callable[[list[NodeDefinition], dict], list[EdgeDefinition]],
) -> tuple[list[NodeDefinition], list[EdgeDefinition]]:
new_nodes = dynamic_node_factory(state)
new_edges = dependency_resolver(new_nodes, state)
return new_nodes, new_edges
def merge_into_base(
self,
new_nodes: list[NodeDefinition],
new_edges: list[EdgeDefinition],
attach_after: str,
) -> DAGDefinition:
builder = DAGBuilder(
f"{self.base_dag.workflow_id}-dynamic",
f"{self.base_dag.name} (dynamic)",
)
for node in self.base_dag.nodes.values():
builder.add_node(node)
for edge in self.base_dag.edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
for node in new_nodes:
builder.add_node(node)
for edge in new_edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
builder.set_entry(self.base_dag.entry_node)
return builder.build()
サブグラフネスト
class SubWorkflowNode:
def __init__(
self,
sub_dag: DAGDefinition,
scheduler_class: type = ConditionalDAGScheduler,
max_concurrency: int = 5,
):
self.sub_dag = sub_dag
self._scheduler_class = scheduler_class
self._max_concurrency = max_concurrency
async def execute(self, state: dict) -> dict:
scheduler = self._scheduler_class(
self.sub_dag, max_concurrency=self._max_concurrency
)
result = await scheduler.execute(state)
if result.status != "completed":
raise RuntimeError(
f"Sub-workflow '{self.sub_dag.workflow_id}' failed: "
f"{[r.error for r in result.node_results.values() if r.error]}"
)
return result.state
def create_sub_workflow_node(
node_id: str,
sub_dag: DAGDefinition,
max_concurrency: int = 5,
) -> NodeDefinition:
sub_executor = SubWorkflowNode(sub_dag, max_concurrency=max_concurrency)
return NodeDefinition(
node_id=node_id,
node_type=NodeType.SUB_WORKFLOW,
handler=sub_executor.execute,
metadata={"sub_workflow_id": sub_dag.workflow_id},
)
例:マルチソースデータ収集
def create_data_source_nodes(state: dict) -> list[NodeDefinition]:
sources = state.get("data_sources", ["api", "database", "file"])
nodes = []
for source in sources:
async def fetch_data(s: dict, src=source) -> dict:
await asyncio.sleep(0.1)
return {f"{src}_data": f"{src}からのデータ"}
nodes.append(
NodeDefinition(
f"fetch_{source}",
NodeType.TOOL_CALL,
handler=fetch_data,
)
)
return nodes
def resolve_dynamic_edges(
new_nodes: list[NodeDefinition], state: dict
) -> list[EdgeDefinition]:
edges = []
for node in new_nodes:
edges.append(EdgeDefinition(source_id="start", target_id=node.node_id))
edges.append(EdgeDefinition(source_id=node.node_id, target_id="aggregate"))
return edges
sub_builder = (
DAGBuilder("data-collection", "データ収集サブグラフ")
.add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
.add_node(NodeDefinition("aggregate", NodeType.TRANSFORM, handler=lambda s: {"aggregated": "全データ統合"}))
.set_entry("start")
)
sub_dag = sub_builder.build()
builder = (
DAGBuilder("main-workflow", "メインワークフロー")
.add_node(NodeDefinition("plan", NodeType.LLM_CALL, handler=lambda s: {**s, "data_sources": ["api", "database", "file"]}))
.add_node(create_sub_workflow_node("collect", sub_dag))
.add_node(NodeDefinition("report", NodeType.LLM_CALL, handler=lambda s: {"report": "最終レポート"}))
.add_edge("plan", "collect")
.add_edge("collect", "report")
.set_entry("plan")
)
main_dag = builder.build()
scheduler = ConditionalDAGScheduler(main_dag)
result = await scheduler.execute({"input": "データ収集レポートを生成"})
パターン7:本番監視とアラート
AIエージェントワークフローDAGエンジンが本番稼働すると、監視が運用の生命線となる。各ノードの実行時間、成功率、エラー分布を把握する必要がある。
メトリクスコレクター
from collections import defaultdict
import statistics
@dataclass
class NodeMetrics:
node_id: str
total_executions: int = 0
success_count: int = 0
failure_count: int = 0
fallback_count: int = 0
total_retry_count: int = 0
execution_times: list[float] = field(default_factory=list)
@property
def success_rate(self) -> float:
if self.total_executions == 0:
return 0.0
return self.success_count / self.total_executions
@property
def avg_execution_time(self) -> float:
if not self.execution_times:
return 0.0
return statistics.mean(self.execution_times)
@property
def p95_execution_time(self) -> float:
if len(self.execution_times) < 2:
return self.avg_execution_time
sorted_times = sorted(self.execution_times)
idx = int(len(sorted_times) * 0.95)
return sorted_times[min(idx, len(sorted_times) - 1)]
@property
def p99_execution_time(self) -> float:
if len(self.execution_times) < 2:
return self.avg_execution_time
sorted_times = sorted(self.execution_times)
idx = int(len(sorted_times) * 0.99)
return sorted_times[min(idx, len(sorted_times) - 1)]
class MetricsCollector:
def __init__(self):
self._node_metrics: dict[str, NodeMetrics] = defaultdict(
lambda: NodeMetrics(node_id="")
)
self._workflow_count = 0
self._workflow_success = 0
self._workflow_failure = 0
def record_node_result(self, result: NodeResult):
metrics = self._node_metrics[result.node_id]
metrics.node_id = result.node_id
metrics.total_executions += 1
metrics.total_retry_count += result.retry_count
if result.status == "completed":
metrics.success_count += 1
elif result.status == "completed_with_fallback":
metrics.fallback_count += 1
metrics.success_count += 1
else:
metrics.failure_count += 1
exec_time = result.end_time - result.start_time
if exec_time > 0:
metrics.execution_times.append(exec_time)
def record_workflow_result(self, result: WorkflowResult):
self._workflow_count += 1
if result.status == "completed":
self._workflow_success += 1
else:
self._workflow_failure += 1
for node_result in result.node_results.values():
self.record_node_result(node_result)
def get_node_metrics(self, node_id: str) -> NodeMetrics | None:
return self._node_metrics.get(node_id)
def get_all_metrics(self) -> dict[str, NodeMetrics]:
return dict(self._node_metrics)
def summary(self) -> dict:
return {
"total_workflows": self._workflow_count,
"success_workflows": self._workflow_success,
"failed_workflows": self._workflow_failure,
"workflow_success_rate": (
self._workflow_success / self._workflow_count
if self._workflow_count > 0
else 0.0
),
"nodes": {
nid: {
"success_rate": f"{m.success_rate:.2%}",
"avg_time": f"{m.avg_execution_time:.3f}s",
"p95_time": f"{m.p95_execution_time:.3f}s",
"p99_time": f"{m.p99_execution_time:.3f}s",
"total_retries": m.total_retry_count,
"fallback_count": m.fallback_count,
}
for nid, m in self._node_metrics.items()
},
}
アラートルール
class AlertRule:
def __init__(
self,
name: str,
condition: Callable[[NodeMetrics], bool],
severity: str = "warning",
message_template: str = "",
):
self.name = name
self.condition = condition
self.severity = severity
self.message_template = message_template
def check(self, metrics: NodeMetrics) -> str | None:
if self.condition(metrics):
return self.message_template.format(
node_id=metrics.node_id,
success_rate=f"{metrics.success_rate:.2%}",
avg_time=f"{metrics.avg_execution_time:.3f}s",
)
return None
class AlertManager:
def __init__(self):
self._rules: list[AlertRule] = []
self._alerts: list[dict] = []
def add_rule(self, rule: AlertRule):
self._rules.append(rule)
def check_metrics(self, metrics_collector: MetricsCollector):
for node_id, metrics in metrics_collector.get_all_metrics().items():
for rule in self._rules:
alert_msg = rule.check(metrics)
if alert_msg:
self._alerts.append({
"rule": rule.name,
"severity": rule.severity,
"node_id": node_id,
"message": alert_msg,
"timestamp": datetime.now().isoformat(),
})
def get_alerts(self, severity: str | None = None) -> list[dict]:
if severity:
return [a for a in self._alerts if a["severity"] == severity]
return list(self._alerts)
alert_mgr = AlertManager()
alert_mgr.add_rule(AlertRule(
name="low_success_rate",
condition=lambda m: m.total_executions >= 5 and m.success_rate < 0.8,
severity="critical",
message_template="Node {node_id} success rate {success_rate} below 80%",
))
alert_mgr.add_rule(AlertRule(
name="high_latency",
condition=lambda m: m.avg_execution_time > 30.0,
severity="warning",
message_template="Node {node_id} avg execution time {avg_time} exceeds 30s",
))
alert_mgr.add_rule(AlertRule(
name="high_retry_rate",
condition=lambda m: m.total_executions > 0
and m.total_retry_count / m.total_executions > 2.0,
severity="warning",
message_template="Node {node_id} has high retry rate, avg retries per execution > 2",
))
5つのよくある落とし穴と解決策
落とし穴1:条件エッジの循環検出漏れ
条件エッジは実行時にのみ有効化されるため、静的循環検出が実行時のループを見逃す可能性がある。
def validate_conditional_cycles(dag: DAGDefinition):
all_edges = list(dag.edges)
for edge in all_edges:
if edge.edge_type == EdgeType.CONDITIONAL:
test_edges = [
e for e in all_edges
if not (e.source_id == edge.source_id
and e.target_id == edge.target_id
and e.edge_type == EdgeType.CONDITIONAL)
]
test_edges.append(EdgeDefinition(
source_id=edge.source_id,
target_id=edge.target_id,
edge_type=EdgeType.NORMAL,
))
test_dag = DAGDefinition(
workflow_id=dag.workflow_id + "-test",
name=dag.name,
nodes=dag.nodes,
edges=test_edges,
entry_node=dag.entry_node,
)
try:
test_dag._check_cycle()
except ValueError:
raise ValueError(
f"Conditional edge '{edge.source_id}' → '{edge.target_id}' "
f"may create a runtime cycle"
)
解決策:すべての条件エッジについて「有効化されたと仮定」の循環検出を行い、いかなる条件の組み合わせでもループが発生しないことを確認する。
落とし穴2:並列ノードの書き込み競合
複数の並列ノードが同時にstate内の同じキーを変更し、データが上書きされる。
def validate_parallel_write_safety(dag: DAGDefinition):
levels = TopologicalSorter(dag).compute_levels()
level_groups: dict[int, list[str]] = {}
for nid, level in levels.items():
level_groups.setdefault(level, []).append(nid)
for level, nodes in level_groups.items():
if len(nodes) <= 1:
continue
output_keys: dict[str, list[str]] = {}
for nid in nodes:
node = dag.nodes[nid]
keys = node.metadata.get("output_keys", [])
for key in keys:
output_keys.setdefault(key, []).append(nid)
conflicts = {k: v for k, v in output_keys.items() if len(v) > 1}
if conflicts:
raise ValueError(
f"Parallel write conflict at level {level}: {conflicts}"
)
解決策:DAG検証時に並列ノードの出力キーの競合を確認するか、名前空間で分離する。
落とし穴3:チェックポイントのシリアライズ失敗
stateにシリアライズ不可能なオブジェクト(DB接続、ファイルハンドルなど)が含まれ、チェックポイント保存が失敗する。
import pickle
def safe_serialize_state(state: dict) -> bytes:
try:
return pickle.dumps(state)
except (pickle.PicklingError, TypeError) as e:
clean_state = {}
for key, value in state.items():
try:
pickle.dumps(value)
clean_state[key] = value
except (pickle.PicklingError, TypeError):
clean_state[key] = f"<non-serializable: {type(value).__name__}>"
return pickle.dumps(clean_state)
解決策:ハンドラーからはJSONシリアライズ可能なデータのみを返すか、カスタムシリアライザーを使用する。
落とし穴4:条件ルーティングのマッチングなし
すべての条件エッジのconditionがFalseを返し、ワークフローが停止する。
def ensure_default_branch(dag: DAGDefinition) -> DAGDefinition:
conditional_sources = set()
for edge in dag.edges:
if edge.edge_type == EdgeType.CONDITIONAL:
conditional_sources.add(edge.source_id)
builder = DAGBuilder(
f"{dag.workflow_id}-safe", f"{dag.name} (safe)"
)
for node in dag.nodes.values():
builder.add_node(node)
for edge in dag.edges:
builder.add_edge(
edge.source_id, edge.target_id,
edge.edge_type, edge.condition, edge.condition_name,
)
for source_id in conditional_sources:
has_normal = any(
e.source_id == source_id and e.edge_type == EdgeType.NORMAL
for e in dag.edges
)
if not has_normal:
builder.add_node(
NodeDefinition(
f"{source_id}_default",
NodeType.TRANSFORM,
handler=lambda s: {"routed_to_default": True},
)
)
builder.add_edge(source_id, f"{source_id}_default")
builder.set_entry(dag.entry_node)
return builder.build()
解決策:すべての条件ルーティングノードにデフォルトブランチを追加し、少なくとも1つのパスが常に実行可能であることを保証する。
落とし穴5:サブグラフの状態漏洩
サブワークフローが親ワークフローのstateを変更し、予期しない副作用を引き起こす。
def isolate_sub_workflow_state(
parent_state: dict, sub_workflow_input_keys: list[str]
) -> tuple[dict, Callable[[dict], dict]]:
isolated = {k: parent_state[k] for k in sub_workflow_input_keys if k in parent_state}
def merge_back(sub_state: dict) -> dict:
output_keys = set(sub_workflow_input_keys)
return {k: v for k, v in sub_state.items() if k not in output_keys}
return isolated, merge_back
解決策:サブワークフロー実行時に必要なキーのみを渡し、返却時は新しいキーのみをマージする。
10のよくあるエラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決策 |
|---|---|---|---|
| 1 | Cycle detected in DAG |
ノード間に循環依存がある | Edge定義を確認し、ループを形成するエッジを削除 |
| 2 | Unreachable nodes detected |
エントリから到達可能なパスがない | Edge接続の欠落を確認 |
| 3 | Entry node not found |
set_entryで指定したノードが存在しない | node_idのスペルを確認 |
| 4 | Source/Target node not found |
add_edgeが存在しないノードを参照 | add_nodeをadd_edgeの前に実行 |
| 5 | Deadlock detected |
条件ルーティングにマッチなし、デフォルトなし | デフォルトブランチを追加または条件関数を確認 |
| 6 | Node failed after N retries |
LLM APIが継続的にタイムアウトまたはエラー | API Key、ネットワーク、フォールバック戦略を確認 |
| 7 | Sub-workflow failed |
サブワークフロー内部ノードの失敗 | サブワークフローのnode_resultsで特定 |
| 8 | Checkpoint serialization error |
stateにシリアライズ不可能なオブジェクト | ハンドラーはdict[str, Any]のみ返す |
| 9 | Parallel write conflict |
並列ノードの出力キーが競合 | 名前空間で出力キーを分離 |
| 10 | Runtime cycle via conditional edge |
条件エッジが実行時にループを形成 | validate_conditional_cyclesで確認 |
高度な最適化テクニック
1. 非同期プリフェッチ:次レベルのノード依存を事前ロード
class PrefetchScheduler(PersistentDAGScheduler):
async def _run_from_start(self, initial_state=None):
execution_id = f"exec-{int(time.time() * 1000)}"
state = dict(initial_state or {})
node_results: dict[str, NodeResult] = {}
completed: set[str] = set()
pending: set[str] = {self.dag.entry_node}
start_time = time.time()
while pending:
ready = [
nid for nid in pending
if self._get_dependencies(nid).issubset(completed)
]
if not ready:
break
prefetch_tasks = []
for nid in ready:
node = self.dag.nodes[nid]
if node.node_type == NodeType.LLM_CALL:
prefetch_tasks.append(
asyncio.create_task(self._warmup_llm(nid))
)
tasks = [
self._execute_node(nid, state, node_results)
for nid in ready
]
results = await asyncio.gather(*tasks, return_exceptions=True)
if prefetch_tasks:
await asyncio.gather(*prefetch_tasks, return_exceptions=True)
for i, result in enumerate(results):
node_id = ready[i]
if isinstance(result, Exception):
node_results[node_id] = NodeResult(
node_id=node_id, status="failed", error=str(result)
)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="failed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
node_results[node_id] = result
state.update(result.output)
completed.add(node_id)
pending.discard(node_id)
next_nodes = self._router.resolve_next_nodes(node_id, state)
for next_id in next_nodes:
if next_id not in completed:
pending.add(next_id)
return WorkflowResult(
workflow_id=self.dag.workflow_id,
execution_id=execution_id,
status="completed",
state=state,
node_results=node_results,
total_time=time.time() - start_time,
)
async def _warmup_llm(self, node_id: str):
logger.info(f"Warming up LLM connection for node '{node_id}'")
await asyncio.sleep(0.01)
2. タイムアウトサーキットブレーカー:遅いノードがワークフロー全体を引きずらないようにする
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_max: int = 1,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self._failure_count = 0
self._last_failure_time: float = 0
self._state = "closed"
self._half_open_count = 0
async def call(self, handler: Callable, state: dict) -> dict:
if self._state == "open":
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = "half_open"
self._half_open_count = 0
else:
raise RuntimeError("Circuit breaker is OPEN")
try:
result = await handler(state) if asyncio.iscoroutinefunction(handler) else await asyncio.to_thread(handler, state)
if self._state == "half_open":
self._half_open_count += 1
if self._half_open_count >= self.half_open_max:
self._state = "closed"
self._failure_count = 0
return result
except Exception as e:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = "open"
raise
3. DAG可視化:Mermaid図の自動生成
def dag_to_mermaid(dag: DAGDefinition) -> str:
lines = ["graph TD"]
for edge in dag.edges:
style = ""
if edge.edge_type == EdgeType.CONDITIONAL:
style = f"|{edge.condition_name}|"
lines.append(f" {edge.source_id} -->{style} {edge.target_id}")
for nid, node in dag.nodes.items():
label = f"{nid}\\n({node.node_type.value})"
lines.append(f" {nid}[\"{label}\"]")
return "\n".join(lines)
オンラインMermaidエディタを使ってDAG可視化図を直接レンダリングできる。
比較分析:カスタムDAG vs LangGraph vs Prefect
| 側面 | カスタムDAGエンジン | LangGraph | Prefect |
|---|---|---|---|
| 言語 | Python(拡張可能) | Python | Python |
| DAG定義 | 宣言的Builder | StateGraph | Flow + Task |
| 並列実行 | ✅ 自動レベルベース | ✅ asyncioベース | ✅ ネイティブDask/Ray |
| 条件ルーティング | ✅ 条件エッジ | ✅ conditional_edges | ✅ branch |
| 状態永続化 | ✅ CheckpointManager | ✅ Checkpointer | ✅ Result + Storage |
| チェックポイントリカバリ | ✅ ネイティブサポート | ✅ 設定必要 | ⚠️ 自前構築 |
| エラー回復 | ✅ リトライ+フォールバック | ⚠️ 自前構築 | ✅ ネイティブリトライ |
| LLM統合 | ⚠️ 自前構築 | ✅ LangChainエコシステム | ⚠️ 自前構築 |
| 可視化 | ✅ Mermaidエクスポート | ✅ LangGraph Studio | ✅ Prefect UI |
| 学習曲線 | 中程度 | 中程度 | 低い |
| 本番監視 | ✅ カスタムMetrics | ⚠️ LangSmith | ✅ Prefect Cloud |
| 動的DAG | ✅ 実行時生成 | ✅ Command | ✅ 動的タスク |
| サブグラフネスト | ✅ SubWorkflowNode | ✅ Subgraph | ⚠️ サブFlow |
| コミュニティ | ❌ 自己保守 | ✅ 活発 | ✅ 活発 |
| 適用シナリオ | 高度なカスタマイズ要件 | LangChainユーザー | 汎用タスクオーケストレーション |
選定ガイド:
- カスタムDAGエンジン:深いカスタマイズ、既存システムとの緊密な統合、極限のパフォーマンス要件
- LangGraph:LangChainエコシステム内、迅速なプロトタイピング、LLMネイティブサポート重視
- Prefect:汎用タスクオーケストレーション、LLM以外の混合ワークフロー、すぐに使えるUI
LangGraphマルチエージェント協調について詳しくは、Python LangGraphマルチエージェント協調実践を参照。エージェントメモリアーキテクチャについては、AIエージェントメモリアーキテクチャ設計を参照。エージェントツール使用については、Python AIエージェントツール使用ガイドを参照。
オンラインツール推奨
| ツール | 用途 | リンク |
|---|---|---|
| JSONフォーマッター | DAG定義JSONの表示・編集 | /ja/json/format |
| Mermaidエディタ | DAGワークフロー図の可視化 | /ja/dev/mermaid |
| Curl→コード変換 | API呼び出しコードの迅速生成 | /ja/dev/curl-to-code |
まとめ
AIエージェントワークフローDAGエンジンは、2026年の本番級AIシステムのコアインフラである。本記事では7つの本番パターンを網羅した:
- タスク定義と依存グラフ構築 — 型安全なDAG Builder、自動循環検出と到達性検証
- トポロジカルソートと並列スケジューリング — レベルベースの自動並列実行、asyncio並行制御
- 条件ルーティングと分岐マージ — 宣言的条件エッジ、実行時動的ルーティング
- エラー回復とリトライ戦略 — 指数バックオフリトライ、フォールバックハンドラー、サーキットブレーカー
- 状態永続化とチェックポイントリカバリ — チェックポイント機構、クラッシュ後のリカバリ
- 動的DAGとサブグラフネスト — 実行時サブタスク生成、サブワークフローのカプセル化と再利用
- 本番監視とアラート — ノードレベルのメトリクス収集、成功率/レイテンシ監視、アラートルール
コア原則:DAGはAIワークフローを「ハードコードパイプライン」から「宣言的オーケストレーション」へと進化させる——エージェントシステムがプロトタイプから本番へ移行するための必須の道である。
他のAIエージェント実践記事:
- Python LangGraphマルチエージェント協調:ステートマシンからワークフロー編成までの5つの実践パターン
- AIエージェントメモリアーキテクチャ:短期記憶から長期知識までの4層システム
- Python AIエージェントツール使用:Function CallingからMCPプロトコルまでの完全ガイド
外部参考:
ブラウザローカルツールを無料で試す →