AIエージェントワークフローDAGエンジン:タスクオーケストレーションから並列実行までの7つの本番パターン

AI与大数据

リニアエージェントパイプラインは終わった、DAGこそがAIワークフローの究極の答え

まだ input → process → output のリニアパイプラインでAIエージェントを構築している?3つのエージェントが並列で調査し、2つのエージェントが直列で分析し、最後に1つのエージェントが統合する——そんなタスクはリニアオーケストレーションでは対応できない。2026年、AIエージェントワークフローDAGエンジンは本番システムの標準装備となっている:DAG(有向非巡回グラフ)により、タスク依存、並列スケジューリング、条件ルーティングがハードコードから宣言的設定へと進化した。

本記事の主な収穫:

  • DAGワークフローエンジンのコア概念とアーキテクチャを理解
  • タスク定義から監視アラートまで7つの本番級DAGオーケストレーションパターンを習得
  • 本番環境でそのまま使える完全なPython実装
  • 5つのよくある落とし穴と解決策、10のエラートラブルシューティング
  • カスタムDAG vs LangGraph vs Prefectの比較分析

目次

  1. DAGワークフローコア概念
  2. パターン1:タスク定義と依存グラフ構築
  3. パターン2:トポロジカルソートと並列スケジューリング
  4. パターン3:条件ルーティングと分岐マージ
  5. パターン4:エラー回復とリトライ戦略
  6. パターン5:状態永続化とチェックポイントリカバリ
  7. パターン6:動的DAGとサブグラフネスト
  8. パターン7:本番監視とアラート
  9. 5つのよくある落とし穴と解決策
  10. 10のよくあるエラートラブルシューティング
  11. 高度な最適化テクニック
  12. 比較分析:カスタムDAG vs LangGraph vs Prefect
  13. オンラインツール推奨
  14. まとめ

DAGワークフローコア概念

DAG(Directed Acyclic Graph、有向非巡回グラフ)はAIエージェントワークフローエンジンの数学的基盤である。各ノードはタスク(エージェント呼び出し、ツール実行、データ変換)を表し、各エッジは依存関係を表す。

┌──────────────────────────────────────────────────────────────┐
│                    DAGワークフローエンジンアーキテクチャ        │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│   ┌─────┐     ┌─────┐     ┌─────┐                          │
│   │ A   │────▶│ B   │────▶│ D   │  ← 直列依存チェーン       │
│   └──┬──┘     └─────┘     └─────┘                          │
│      │                       ▲                               │
│      │     ┌─────┐          │                               │
│      └────▶│ C   │──────────┘  ← B、C並列、Dは両方を待機     │
│            └──┬──┘                                            │
│               │         ┌─────┐                              │
│               └────────▶│ E   │  ← 条件ルーティング:C→E或C→F│
│                         └─────┘                              │
│               ┌─────┐                                        │
│               │ F   │  ← 条件分岐のもう一つのパス              │
│               └─────┘                                        │
│                                                              │
│   コア保証:                                                   │
│   1. 非巡回 — A→B→C→A の循環依存は存在しない                  │
│   2. トポロジカル順序 — 少なくとも1つの正当な実行順序が存在     │
│   3. 並列度 — 依存関係のないノードは同時実行可能               │
└──────────────────────────────────────────────────────────────┘

主要用語

用語 説明
Node(ノード) ワークフロー内の実行単位(LLM呼び出し、ツール実行、データ変換)
Edge(エッジ) ノード間の依存関係。通常エッジと条件エッジに分かれる
DAG(有向非巡回グラフ) ノードとエッジからなるグラフ構造。循環なしを保証
トポロジカルソート DAGノードを正当な実行順序に並べるアルゴリズム
レベル トポロジカルソート後、同じレベルのノードは並列実行可能
Checkpoint ワークフロー実行状態のスナップショット。リカバリに使用
条件ルーティング 実行時状態に基づき動的に次の実行ノードを選択

なぜDAGはリニアパイプラインより優れているのか?

側面 リニアパイプライン DAGワークフロー
並列実行 ❌ 直列のみ ✅ 依存のないノードが並列実行
条件分岐 ⚠️ ハードコードif-else ✅ 宣言的条件エッジ
エラー回復 ❌ 最初からやり直し ✅ チェックポイントリカバリ
可視化 ⚠️ 理解が困難 ✅ グラフ構造が直感的
拡張性 ❌ 変更が全体に波及 ✅ 局所的変更、全体の安全性

パターン1:タスク定義と依存グラフ構築

AIエージェントワークフローDAGエンジンの第一歩は、タスクノードと依存関係の定義である。Pythonで型安全なDAG定義システムを実装する。

基本データモデル

from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
import hashlib
import json


class NodeType(Enum):
    LLM_CALL = "llm_call"
    TOOL_CALL = "tool_call"
    TRANSFORM = "transform"
    CONDITION = "condition"
    PARALLEL_GROUP = "parallel_group"
    SUB_WORKFLOW = "sub_workflow"
    HUMAN_APPROVAL = "human_approval"


class EdgeType(Enum):
    NORMAL = "normal"
    CONDITIONAL = "conditional"


@dataclass
class RetryPolicy:
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    backoff_factor: float = 2.0
    retryable_exceptions: list[type[Exception]] = field(
        default_factory=lambda: [Exception]
    )


@dataclass
class NodeDefinition:
    node_id: str
    node_type: NodeType
    handler: Callable[..., Any] | None = None
    timeout_seconds: float = 300.0
    retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
    metadata: dict[str, Any] = field(default_factory=dict)

    def __hash__(self):
        return hash(self.node_id)

    def __eq__(self, other):
        if isinstance(other, NodeDefinition):
            return self.node_id == other.node_id
        return False


@dataclass
class EdgeDefinition:
    source_id: str
    target_id: str
    edge_type: EdgeType = EdgeType.NORMAL
    condition: Callable[..., bool] | None = None
    condition_name: str = ""

    def __hash__(self):
        return hash((self.source_id, self.target_id, self.condition_name))

DAGビルダー

class DAGBuilder:
    def __init__(self, workflow_id: str, name: str = ""):
        self.workflow_id = workflow_id
        self.name = name
        self._nodes: dict[str, NodeDefinition] = {}
        self._edges: list[EdgeDefinition] = []
        self._entry_node: str | None = None

    def add_node(self, node: NodeDefinition) -> DAGBuilder:
        if node.node_id in self._nodes:
            raise ValueError(f"Node '{node.node_id}' already exists")
        self._nodes[node.node_id] = node
        return self

    def add_edge(
        self,
        source_id: str,
        target_id: str,
        edge_type: EdgeType = EdgeType.NORMAL,
        condition: Callable[..., bool] | None = None,
        condition_name: str = "",
    ) -> DAGBuilder:
        if source_id not in self._nodes:
            raise ValueError(f"Source node '{source_id}' not found")
        if target_id not in self._nodes:
            raise ValueError(f"Target node '{target_id}' not found")
        self._edges.append(
            EdgeDefinition(
                source_id=source_id,
                target_id=target_id,
                edge_type=edge_type,
                condition=condition,
                condition_name=condition_name,
            )
        )
        return self

    def set_entry(self, node_id: str) -> DAGBuilder:
        if node_id not in self._nodes:
            raise ValueError(f"Entry node '{node_id}' not found")
        self._entry_node = node_id
        return self

    def build(self) -> DAGDefinition:
        if not self._entry_node:
            raise ValueError("Entry node not set")
        dag = DAGDefinition(
            workflow_id=self.workflow_id,
            name=self.name,
            nodes=dict(self._nodes),
            edges=list(self._edges),
            entry_node=self._entry_node,
        )
        dag.validate()
        return dag


@dataclass
class DAGDefinition:
    workflow_id: str
    name: str
    nodes: dict[str, NodeDefinition]
    edges: list[EdgeDefinition]
    entry_node: str

    def validate(self):
        self._check_cycle()
        self._check_reachability()

    def _check_cycle(self):
        adjacency: dict[str, set[str]] = {
            nid: set() for nid in self.nodes
        }
        for edge in self.edges:
            adjacency[edge.source_id].add(edge.target_id)

        visited: set[str] = set()
        recursion_stack: set[str] = set()

        def dfs(node_id: str) -> bool:
            visited.add(node_id)
            recursion_stack.add(node_id)
            for neighbor in adjacency.get(node_id, set()):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in recursion_stack:
                    return True
            recursion_stack.remove(node_id)
            return False

        for node_id in self.nodes:
            if node_id not in visited:
                if dfs(node_id):
                    raise ValueError(
                        f"Cycle detected in DAG '{self.workflow_id}'"
                    )

    def _check_reachability(self):
        reachable: set[str] = set()
        stack = [self.entry_node]
        while stack:
            current = stack.pop()
            if current in reachable:
                continue
            reachable.add(current)
            for edge in self.edges:
                if edge.source_id == current:
                    stack.append(edge.target_id)

        unreachable = set(self.nodes.keys()) - reachable
        if unreachable:
            raise ValueError(
                f"Unreachable nodes detected: {unreachable}"
            )

    def fingerprint(self) -> str:
        data = {
            "nodes": sorted(self.nodes.keys()),
            "edges": [
                {"s": e.source_id, "t": e.target_id, "c": e.condition_name}
                for e in sorted(
                    self.edges,
                    key=lambda e: (e.source_id, e.target_id),
                )
            ],
        }
        raw = json.dumps(data, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()[:12]

構築例:コンテンツ生成ワークフロー

def fetch_topic(state: dict) -> dict:
    return {"topic": state.get("input", "AI技術トレンド")}

def research(state: dict) -> dict:
    topic = state["topic"]
    return {"research_data": f"{topic}に関する深い研究データ..."}

def analyze(state: dict) -> dict:
    data = state["research_data"]
    return {"analysis": f"{data}に基づく分析結論..."}

def write_draft(state: dict) -> dict:
    analysis = state["analysis"]
    return {"draft": f"分析{analysis}に基づく下書き内容..."}

def review(state: dict) -> dict:
    draft = state["draft"]
    return {"review_result": "approved", "final_content": draft}

def needs_revision(state: dict) -> bool:
    return state.get("review_result") == "needs_revision"

def is_approved(state: dict) -> bool:
    return state.get("review_result") == "approved"


builder = (
    DAGBuilder("content-gen-v1", "コンテンツ生成ワークフロー")
    .add_node(NodeDefinition("fetch", NodeType.TRANSFORM, handler=fetch_topic))
    .add_node(NodeDefinition("research", NodeType.LLM_CALL, handler=research))
    .add_node(NodeDefinition("analyze", NodeType.LLM_CALL, handler=analyze))
    .add_node(NodeDefinition("write", NodeType.LLM_CALL, handler=write_draft))
    .add_node(NodeDefinition("review", NodeType.LLM_CALL, handler=review))
    .add_edge("fetch", "research")
    .add_edge("research", "analyze")
    .add_edge("analyze", "write")
    .add_edge("write", "review")
    .add_edge(
        "review", "write",
        EdgeType.CONDITIONAL,
        condition=needs_revision,
        condition_name="needs_revision",
    )
    .set_entry("fetch")
)

dag = builder.build()
print(f"DAG fingerprint: {dag.fingerprint()}")

パターン2:トポロジカルソートと並列スケジューリング

DAGエンジンのコアスケジューリング能力はトポロジカルソートに由来する。ソート後、同じレベルのノードには相互依存がなく、並列実行が可能——これがAIワークフローエンジンのパフォーマンスの鍵である。

トポロジカルソートとレベル計算

from collections import deque


class TopologicalSorter:
    def __init__(self, dag: DAGDefinition):
        self.dag = dag
        self._adjacency: dict[str, set[str]] = {nid: set() for nid in dag.nodes}
        self._in_degree: dict[str, int] = {nid: 0 for nid in dag.nodes}
        for edge in dag.edges:
            if edge.edge_type == EdgeType.NORMAL:
                self._adjacency[edge.source_id].add(edge.target_id)
                self._in_degree[edge.target_id] += 1

    def sort(self) -> list[str]:
        in_degree = dict(self._in_degree)
        queue = deque(
            nid for nid, deg in in_degree.items() if deg == 0
        )
        result = []
        while queue:
            node_id = queue.popleft()
            result.append(node_id)
            for neighbor in self._adjacency[node_id]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        if len(result) != len(self.dag.nodes):
            raise ValueError("DAG contains a cycle (should have been caught in validation)")
        return result

    def compute_levels(self) -> dict[str, int]:
        levels: dict[str, int] = {}
        order = self.sort()
        for node_id in order:
            max_parent_level = -1
            for edge in self.dag.edges:
                if edge.target_id == node_id and edge.edge_type == EdgeType.NORMAL:
                    parent_level = levels.get(edge.source_id, 0)
                    max_parent_level = max(max_parent_level, parent_level)
            levels[node_id] = max_parent_level + 1
        return levels

    def get_parallel_groups(self) -> list[list[str]]:
        levels = self.compute_levels()
        max_level = max(levels.values()) if levels else 0
        groups: list[list[str]] = []
        for level in range(max_level + 1):
            group = [nid for nid, lvl in levels.items() if lvl == level]
            if group:
                groups.append(group)
        return groups

並列スケジューラ

import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class NodeResult:
    node_id: str
    status: str
    output: dict = field(default_factory=dict)
    error: str | None = None
    start_time: float = 0.0
    end_time: float = 0.0
    retry_count: int = 0


@dataclass
class WorkflowResult:
    workflow_id: str
    execution_id: str
    status: str
    state: dict = field(default_factory=dict)
    node_results: dict[str, NodeResult] = field(default_factory=dict)
    total_time: float = 0.0


class DAGScheduler:
    def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
        self.dag = dag
        self.max_concurrency = max_concurrency
        self._sorter = TopologicalSorter(dag)
        self._semaphore = asyncio.Semaphore(max_concurrency)

    async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
        execution_id = f"exec-{int(time.time() * 1000)}"
        state = dict(initial_state or {})
        node_results: dict[str, NodeResult] = {}
        start_time = time.time()

        parallel_groups = self._sorter.get_parallel_groups()

        for group in parallel_groups:
            tasks = [
                self._execute_node(node_id, state, node_results)
                for node_id in group
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for i, result in enumerate(results):
                node_id = group[i]
                if isinstance(result, Exception):
                    node_results[node_id] = NodeResult(
                        node_id=node_id,
                        status="failed",
                        error=str(result),
                    )
                    return WorkflowResult(
                        workflow_id=self.dag.workflow_id,
                        execution_id=execution_id,
                        status="failed",
                        state=state,
                        node_results=node_results,
                        total_time=time.time() - start_time,
                    )
                node_results[node_id] = result
                state.update(result.output)

        return WorkflowResult(
            workflow_id=self.dag.workflow_id,
            execution_id=execution_id,
            status="completed",
            state=state,
            node_results=node_results,
            total_time=time.time() - start_time,
        )

    async def _execute_node(
        self,
        node_id: str,
        state: dict,
        node_results: dict[str, NodeResult],
    ) -> NodeResult:
        node = self.dag.nodes[node_id]
        start_time = time.time()

        async with self._semaphore:
            try:
                if asyncio.iscoroutinefunction(node.handler):
                    output = await node.handler(state)
                else:
                    output = await asyncio.to_thread(node.handler, state)

                if not isinstance(output, dict):
                    output = {"result": output}

                return NodeResult(
                    node_id=node_id,
                    status="completed",
                    output=output,
                    start_time=start_time,
                    end_time=time.time(),
                )
            except Exception as e:
                return NodeResult(
                    node_id=node_id,
                    status="failed",
                    error=str(e),
                    start_time=start_time,
                    end_time=time.time(),
                )

実行例

async def parallel_research_a(state: dict) -> dict:
    await asyncio.sleep(0.1)
    return {"research_a": "技術トレンド研究データ"}

async def parallel_research_b(state: dict) -> dict:
    await asyncio.sleep(0.1)
    return {"research_b": "市場分析研究データ"}

async def merge_research(state: dict) -> dict:
    return {
        "merged": f"{state.get('research_a', '')} + {state.get('research_b', '')}"
    }

builder = (
    DAGBuilder("parallel-research", "並列研究ワークフロー")
    .add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
    .add_node(NodeDefinition("research_a", NodeType.LLM_CALL, handler=parallel_research_a))
    .add_node(NodeDefinition("research_b", NodeType.LLM_CALL, handler=parallel_research_b))
    .add_node(NodeDefinition("merge", NodeType.TRANSFORM, handler=merge_research))
    .add_edge("start", "research_a")
    .add_edge("start", "research_b")
    .add_edge("research_a", "merge")
    .add_edge("research_b", "merge")
    .set_entry("start")
)

dag = builder.build()
scheduler = DAGScheduler(dag)
result = await scheduler.execute({"input": "AIエージェントワークフローDAGエンジン"})

print(f"Status: {result.status}")
print(f"Total time: {result.total_time:.3f}s")
print(f"Parallel groups: {TopologicalSorter(dag).get_parallel_groups()}")

パターン3:条件ルーティングと分岐マージ

実際のAIワークフローは単一パスではない。エージェント出力、データ品質、ユーザー設定に基づいて動的に実行パスを選択することが、DAGオーケストレーションの中核能力である。

条件ルーターの実装

class ConditionalRouter:
    def __init__(self, dag: DAGDefinition):
        self.dag = dag
        self._conditional_edges: dict[str, list[EdgeDefinition]] = {}
        for edge in dag.edges:
            if edge.edge_type == EdgeType.CONDITIONAL:
                self._conditional_edges.setdefault(edge.source_id, []).append(edge)

    def resolve_next_nodes(
        self, node_id: str, state: dict
    ) -> list[str]:
        next_nodes: list[str] = []
        for edge in self.dag.edges:
            if edge.source_id != node_id:
                continue
            if edge.edge_type == EdgeType.NORMAL:
                next_nodes.append(edge.target_id)
            elif edge.edge_type == EdgeType.CONDITIONAL:
                if edge.condition and edge.condition(state):
                    next_nodes.append(edge.target_id)
        return next_nodes

    def get_all_branches(self) -> dict[str, list[str]]:
        branches: dict[str, list[str]] = {}
        for source_id, edges in self._conditional_edges.items():
            branches[source_id] = [
                f"{e.condition_name} → {e.target_id}" for e in edges
            ]
        return branches

条件ルーティング付きスケジューラ

class ConditionalDAGScheduler(DAGScheduler):
    def __init__(self, dag: DAGDefinition, max_concurrency: int = 10):
        super().__init__(dag, max_concurrency)
        self._router = ConditionalRouter(dag)

    async def execute(self, initial_state: dict | None = None) -> WorkflowResult:
        execution_id = f"exec-{int(time.time() * 1000)}"
        state = dict(initial_state or {})
        node_results: dict[str, NodeResult] = {}
        start_time = time.time()

        completed: set[str] = set()
        pending: set[str] = {self.dag.entry_node}

        while pending:
            ready: list[str] = []
            for node_id in list(pending):
                deps = self._get_dependencies(node_id)
                if deps.issubset(completed):
                    ready.append(node_id)

            if not ready:
                raise RuntimeError(
                    f"Deadlock detected. Pending: {pending}, Completed: {completed}"
                )

            tasks = [
                self._execute_node(node_id, state, node_results)
                for node_id in ready
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i, result in enumerate(results):
                node_id = ready[i]
                if isinstance(result, Exception):
                    node_results[node_id] = NodeResult(
                        node_id=node_id, status="failed", error=str(result)
                    )
                    return WorkflowResult(
                        workflow_id=self.dag.workflow_id,
                        execution_id=execution_id,
                        status="failed",
                        state=state,
                        node_results=node_results,
                        total_time=time.time() - start_time,
                    )
                node_results[node_id] = result
                state.update(result.output)
                completed.add(node_id)
                pending.discard(node_id)

                next_nodes = self._router.resolve_next_nodes(node_id, state)
                for next_id in next_nodes:
                    if next_id not in completed:
                        pending.add(next_id)

        return WorkflowResult(
            workflow_id=self.dag.workflow_id,
            execution_id=execution_id,
            status="completed",
            state=state,
            node_results=node_results,
            total_time=time.time() - start_time,
        )

    def _get_dependencies(self, node_id: str) -> set[str]:
        deps: set[str] = set()
        for edge in self.dag.edges:
            if edge.target_id == node_id:
                deps.add(edge.source_id)
        return deps

例:スマートカスタマーサービスルーティング

def classify_intent(state: dict) -> dict:
    user_input = state.get("user_input", "")
    if "返金" in user_input:
        return {"intent": "refund", "confidence": 0.95}
    elif "技術" in user_input or "bug" in user_input.lower():
        return {"intent": "technical", "confidence": 0.90}
    else:
        return {"intent": "general", "confidence": 0.80}

def handle_refund(state: dict) -> dict:
    return {"response": "返金プロセスを開始しました。3-5営業日で着金予定です"}

def handle_technical(state: dict) -> dict:
    return {"response": "テクニカルサポートチームが問題を確認しました。2時間以内に対応します"}

def handle_general(state: dict) -> dict:
    return {"response": "お問い合わせありがとうございます。担当者がまもなくご案内します"}

def is_refund(state: dict) -> bool:
    return state.get("intent") == "refund"

def is_technical(state: dict) -> bool:
    return state.get("intent") == "technical"

def is_general(state: dict) -> bool:
    return state.get("intent") == "general"


builder = (
    DAGBuilder("customer-service", "スマートカスタマーサービスルーティング")
    .add_node(NodeDefinition("classify", NodeType.LLM_CALL, handler=classify_intent))
    .add_node(NodeDefinition("refund_handler", NodeType.LLM_CALL, handler=handle_refund))
    .add_node(NodeDefinition("tech_handler", NodeType.LLM_CALL, handler=handle_technical))
    .add_node(NodeDefinition("general_handler", NodeType.LLM_CALL, handler=handle_general))
    .add_node(NodeDefinition("respond", NodeType.TRANSFORM, handler=lambda s: {"final": s.get("response", "")}))
    .add_edge("classify", "refund_handler", EdgeType.CONDITIONAL, is_refund, "is_refund")
    .add_edge("classify", "tech_handler", EdgeType.CONDITIONAL, is_technical, "is_technical")
    .add_edge("classify", "general_handler", EdgeType.CONDITIONAL, is_general, "is_general")
    .add_edge("refund_handler", "respond")
    .add_edge("tech_handler", "respond")
    .add_edge("general_handler", "respond")
    .set_entry("classify")
)

dag = builder.build()
scheduler = ConditionalDAGScheduler(dag)
result = await scheduler.execute({"user_input": "返金したいです。商品に欠陥があります"})
print(f"Response: {result.state.get('final', '')}")

パターン4:エラー回復とリトライ戦略

AIワークフローでは、LLM呼び出しやAPIリクエストがいつでも失敗する可能性がある。リトライとエラー回復のないDAGエンジンは本番環境では受け入れられない。

リトライエグゼキューターの実装

import random
import logging

logger = logging.getLogger(__name__)


class RetryExecutor:
    def __init__(self, retry_policy: RetryPolicy):
        self.policy = retry_policy

    async def execute_with_retry(
        self,
        handler: Callable[..., Any],
        state: dict,
        node_id: str,
    ) -> NodeResult:
        last_error: Exception | None = None
        retry_count = 0

        for attempt in range(self.policy.max_retries + 1):
            try:
                if asyncio.iscoroutinefunction(handler):
                    output = await handler(state)
                else:
                    output = await asyncio.to_thread(handler, state)

                if not isinstance(output, dict):
                    output = {"result": output}

                return NodeResult(
                    node_id=node_id,
                    status="completed",
                    output=output,
                    retry_count=retry_count,
                )
            except tuple(self.policy.retryable_exceptions) as e:
                last_error = e
                retry_count += 1
                if attempt < self.policy.max_retries:
                    delay = min(
                        self.policy.base_delay
                        * (self.policy.backoff_factor ** attempt),
                        self.policy.max_delay,
                    )
                    jitter = random.uniform(0, delay * 0.1)
                    logger.warning(
                        f"Node '{node_id}' failed (attempt {attempt + 1}/"
                        f"{self.policy.max_retries + 1}), "
                        f"retrying in {delay + jitter:.2f}s: {e}"
                    )
                    await asyncio.sleep(delay + jitter)
            except Exception as e:
                last_error = e
                break

        return NodeResult(
            node_id=node_id,
            status="failed",
            error=str(last_error),
            retry_count=retry_count,
        )

リトライとフォールバック付きスケジューラ

class ResilientDAGScheduler(ConditionalDAGScheduler):
    def __init__(
        self,
        dag: DAGDefinition,
        max_concurrency: int = 10,
        fallback_handlers: dict[str, Callable] | None = None,
    ):
        super().__init__(dag, max_concurrency)
        self._fallback_handlers = fallback_handlers or {}

    async def _execute_node(
        self,
        node_id: str,
        state: dict,
        node_results: dict[str, NodeResult],
    ) -> NodeResult:
        node = self.dag.nodes[node_id]
        retry_executor = RetryExecutor(node.retry_policy)
        result = await retry_executor.execute_with_retry(
            node.handler, state, node_id
        )

        if result.status == "failed" and node_id in self._fallback_handlers:
            logger.info(f"Node '{node_id}' failed, executing fallback handler")
            try:
                fallback = self._fallback_handlers[node_id]
                if asyncio.iscoroutinefunction(fallback):
                    output = await fallback(state)
                else:
                    output = await asyncio.to_thread(fallback, state)
                if not isinstance(output, dict):
                    output = {"result": output}
                return NodeResult(
                    node_id=node_id,
                    status="completed_with_fallback",
                    output=output,
                    retry_count=result.retry_count,
                )
            except Exception as fallback_error:
                result.error = f"Primary: {result.error}; Fallback: {fallback_error}"

        return result

使用例

async def call_llm_with_retry(state: dict) -> dict:
    if random.random() < 0.5:
        raise ConnectionError("LLM API timeout")
    return {"llm_response": "分析結果..."}

def fallback_llm(state: dict) -> dict:
    return {"llm_response": "フォールバック:キャッシュ結果を使用"}

builder = (
    DAGBuilder("resilient-workflow", "耐障害ワークフロー")
    .add_node(
        NodeDefinition(
            "llm_call",
            NodeType.LLM_CALL,
            handler=call_llm_with_retry,
            retry_policy=RetryPolicy(
                max_retries=3,
                base_delay=0.5,
                retryable_exceptions=[ConnectionError, TimeoutError],
            ),
        )
    )
    .set_entry("llm_call")
)

dag = builder.build()
scheduler = ResilientDAGScheduler(
    dag,
    fallback_handlers={"llm_call": fallback_llm},
)
result = await scheduler.execute({"input": "test"})
print(f"Status: {result.status}")

パターン5:状態永続化とチェックポイントリカバリ

長時間実行されるAIワークフロー(マルチラウンドエージェント協調、大規模データ処理など)では状態永続化が必須である。実行途中でクラッシュしてもチェックポイントからリカバリでき、最初からやり直す必要がない。

チェックポイントマネージャー

import json
from pathlib import Path
from datetime import datetime


class CheckpointManager:
    def __init__(self, storage_dir: str = ".checkpoints"):
        self._storage = Path(storage_dir)
        self._storage.mkdir(parents=True, exist_ok=True)

    def save(
        self,
        workflow_id: str,
        execution_id: str,
        state: dict,
        completed_nodes: set[str],
        pending_nodes: set[str],
        node_results: dict[str, NodeResult],
    ) -> str:
        checkpoint_id = f"cp-{int(time.time() * 1000)}"
        checkpoint_data = {
            "checkpoint_id": checkpoint_id,
            "workflow_id": workflow_id,
            "execution_id": execution_id,
            "state": state,
            "completed_nodes": list(completed_nodes),
            "pending_nodes": list(pending_nodes),
            "node_results": {
                nid: {
                    "node_id": r.node_id,
                    "status": r.status,
                    "output": r.output,
                    "error": r.error,
                    "retry_count": r.retry_count,
                }
                for nid, r in node_results.items()
            },
            "saved_at": datetime.now().isoformat(),
        }
        filepath = self._storage / f"{workflow_id}_{execution_id}.json"
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
        return checkpoint_id

    def load(
        self, workflow_id: str, execution_id: str
    ) -> dict | None:
        filepath = self._storage / f"{workflow_id}_{execution_id}.json"
        if not filepath.exists():
            return None
        with open(filepath, "r", encoding="utf-8") as f:
            return json.load(f)

    def list_checkpoints(self, workflow_id: str) -> list[dict]:
        checkpoints = []
        for fp in self._storage.glob(f"{workflow_id}_*.json"):
            with open(fp, "r", encoding="utf-8") as f:
                data = json.load(f)
                checkpoints.append({
                    "execution_id": data["execution_id"],
                    "saved_at": data["saved_at"],
                    "completed": len(data["completed_nodes"]),
                    "pending": len(data["pending_nodes"]),
                })
        return sorted(checkpoints, key=lambda x: x["saved_at"], reverse=True)

    def cleanup(self, workflow_id: str, keep_last: int = 5):
        checkpoints = self.list_checkpoints(workflow_id)
        for cp in checkpoints[keep_last:]:
            filepath = self._storage / f"{workflow_id}_{cp['execution_id']}.json"
            filepath.unlink(missing_ok=True)

チェックポイントリカバリ対応スケジューラ

class PersistentDAGScheduler(ResilientDAGScheduler):
    def __init__(
        self,
        dag: DAGDefinition,
        checkpoint_manager: CheckpointManager,
        max_concurrency: int = 10,
        checkpoint_interval: int = 1,
        fallback_handlers: dict[str, Callable] | None = None,
    ):
        super().__init__(dag, max_concurrency, fallback_handlers)
        self._checkpoint_mgr = checkpoint_manager
        self._checkpoint_interval = checkpoint_interval

    async def execute(
        self,
        initial_state: dict | None = None,
        execution_id: str | None = None,
    ) -> WorkflowResult:
        if execution_id:
            return await self._resume(execution_id, initial_state)
        return await self._run_from_start(initial_state)

    async def _run_from_start(
        self, initial_state: dict | None = None
    ) -> WorkflowResult:
        execution_id = f"exec-{int(time.time() * 1000)}"
        state = dict(initial_state or {})
        node_results: dict[str, NodeResult] = {}
        completed: set[str] = set()
        pending: set[str] = {self.dag.entry_node}
        start_time = time.time()
        steps_since_checkpoint = 0

        while pending:
            ready = [
                nid for nid in pending
                if self._get_dependencies(nid).issubset(completed)
            ]
            if not ready:
                raise RuntimeError("Deadlock in DAG execution")

            tasks = [
                self._execute_node(nid, state, node_results)
                for nid in ready
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i, result in enumerate(results):
                node_id = ready[i]
                if isinstance(result, Exception):
                    node_results[node_id] = NodeResult(
                        node_id=node_id, status="failed", error=str(result)
                    )
                    self._checkpoint_mgr.save(
                        self.dag.workflow_id, execution_id,
                        state, completed, pending, node_results,
                    )
                    return WorkflowResult(
                        workflow_id=self.dag.workflow_id,
                        execution_id=execution_id,
                        status="failed",
                        state=state,
                        node_results=node_results,
                        total_time=time.time() - start_time,
                    )
                node_results[node_id] = result
                state.update(result.output)
                completed.add(node_id)
                pending.discard(node_id)

                next_nodes = self._router.resolve_next_nodes(node_id, state)
                for next_id in next_nodes:
                    if next_id not in completed:
                        pending.add(next_id)

                steps_since_checkpoint += 1
                if steps_since_checkpoint >= self._checkpoint_interval:
                    self._checkpoint_mgr.save(
                        self.dag.workflow_id, execution_id,
                        state, completed, pending, node_results,
                    )
                    steps_since_checkpoint = 0

        return WorkflowResult(
            workflow_id=self.dag.workflow_id,
            execution_id=execution_id,
            status="completed",
            state=state,
            node_results=node_results,
            total_time=time.time() - start_time,
        )

    async def _resume(
        self,
        execution_id: str,
        initial_state: dict | None = None,
    ) -> WorkflowResult:
        checkpoint = self._checkpoint_mgr.load(
            self.dag.workflow_id, execution_id
        )
        if not checkpoint:
            raise ValueError(
                f"No checkpoint found for {self.dag.workflow_id}/{execution_id}"
            )

        state = checkpoint["state"]
        if initial_state:
            state.update(initial_state)

        completed = set(checkpoint["completed_nodes"])
        pending = set(checkpoint["pending_nodes"])
        node_results = {
            nid: NodeResult(
                node_id=r["node_id"],
                status=r["status"],
                output=r["output"],
                error=r.get("error"),
                retry_count=r.get("retry_count", 0),
            )
            for nid, r in checkpoint["node_results"].items()
        }

        failed_nodes = {
            nid for nid, r in node_results.items() if r.status == "failed"
        }
        pending.update(failed_nodes)

        start_time = time.time()
        steps_since_checkpoint = 0

        while pending:
            ready = [
                nid for nid in pending
                if self._get_dependencies(nid).issubset(completed)
            ]
            if not ready:
                break

            tasks = [
                self._execute_node(nid, state, node_results)
                for nid in ready
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i, result in enumerate(results):
                node_id = ready[i]
                if isinstance(result, Exception):
                    node_results[node_id] = NodeResult(
                        node_id=node_id, status="failed", error=str(result)
                    )
                    self._checkpoint_mgr.save(
                        self.dag.workflow_id, execution_id,
                        state, completed, pending, node_results,
                    )
                    return WorkflowResult(
                        workflow_id=self.dag.workflow_id,
                        execution_id=execution_id,
                        status="failed",
                        state=state,
                        node_results=node_results,
                        total_time=time.time() - start_time,
                    )
                node_results[node_id] = result
                state.update(result.output)
                completed.add(node_id)
                pending.discard(node_id)

                next_nodes = self._router.resolve_next_nodes(node_id, state)
                for next_id in next_nodes:
                    if next_id not in completed:
                        pending.add(next_id)

                steps_since_checkpoint += 1
                if steps_since_checkpoint >= self._checkpoint_interval:
                    self._checkpoint_mgr.save(
                        self.dag.workflow_id, execution_id,
                        state, completed, pending, node_results,
                    )
                    steps_since_checkpoint = 0

        return WorkflowResult(
            workflow_id=self.dag.workflow_id,
            execution_id=execution_id,
            status="completed",
            state=state,
            node_results=node_results,
            total_time=time.time() - start_time,
        )

パターン6:動的DAGとサブグラフネスト

本番環境ではDAGは静的ではない。実行時データに基づいて動的にサブタスクを生成し、サブワークフローをネストすることが高度なオーケストレーションの鍵となる。

動的DAG生成

class DynamicDAGGenerator:
    def __init__(self, base_dag: DAGDefinition):
        self.base_dag = base_dag

    def generate_dynamic_nodes(
        self,
        state: dict,
        dynamic_node_factory: Callable[[dict], list[NodeDefinition]],
        dependency_resolver: Callable[[list[NodeDefinition], dict], list[EdgeDefinition]],
    ) -> tuple[list[NodeDefinition], list[EdgeDefinition]]:
        new_nodes = dynamic_node_factory(state)
        new_edges = dependency_resolver(new_nodes, state)
        return new_nodes, new_edges

    def merge_into_base(
        self,
        new_nodes: list[NodeDefinition],
        new_edges: list[EdgeDefinition],
        attach_after: str,
    ) -> DAGDefinition:
        builder = DAGBuilder(
            f"{self.base_dag.workflow_id}-dynamic",
            f"{self.base_dag.name} (dynamic)",
        )
        for node in self.base_dag.nodes.values():
            builder.add_node(node)
        for edge in self.base_dag.edges:
            builder.add_edge(
                edge.source_id, edge.target_id,
                edge.edge_type, edge.condition, edge.condition_name,
            )
        for node in new_nodes:
            builder.add_node(node)
        for edge in new_edges:
            builder.add_edge(
                edge.source_id, edge.target_id,
                edge.edge_type, edge.condition, edge.condition_name,
            )
        builder.set_entry(self.base_dag.entry_node)
        return builder.build()

サブグラフネスト

class SubWorkflowNode:
    def __init__(
        self,
        sub_dag: DAGDefinition,
        scheduler_class: type = ConditionalDAGScheduler,
        max_concurrency: int = 5,
    ):
        self.sub_dag = sub_dag
        self._scheduler_class = scheduler_class
        self._max_concurrency = max_concurrency

    async def execute(self, state: dict) -> dict:
        scheduler = self._scheduler_class(
            self.sub_dag, max_concurrency=self._max_concurrency
        )
        result = await scheduler.execute(state)
        if result.status != "completed":
            raise RuntimeError(
                f"Sub-workflow '{self.sub_dag.workflow_id}' failed: "
                f"{[r.error for r in result.node_results.values() if r.error]}"
            )
        return result.state


def create_sub_workflow_node(
    node_id: str,
    sub_dag: DAGDefinition,
    max_concurrency: int = 5,
) -> NodeDefinition:
    sub_executor = SubWorkflowNode(sub_dag, max_concurrency=max_concurrency)
    return NodeDefinition(
        node_id=node_id,
        node_type=NodeType.SUB_WORKFLOW,
        handler=sub_executor.execute,
        metadata={"sub_workflow_id": sub_dag.workflow_id},
    )

例:マルチソースデータ収集

def create_data_source_nodes(state: dict) -> list[NodeDefinition]:
    sources = state.get("data_sources", ["api", "database", "file"])
    nodes = []
    for source in sources:
        async def fetch_data(s: dict, src=source) -> dict:
            await asyncio.sleep(0.1)
            return {f"{src}_data": f"{src}からのデータ"}
        nodes.append(
            NodeDefinition(
                f"fetch_{source}",
                NodeType.TOOL_CALL,
                handler=fetch_data,
            )
        )
    return nodes

def resolve_dynamic_edges(
    new_nodes: list[NodeDefinition], state: dict
) -> list[EdgeDefinition]:
    edges = []
    for node in new_nodes:
        edges.append(EdgeDefinition(source_id="start", target_id=node.node_id))
        edges.append(EdgeDefinition(source_id=node.node_id, target_id="aggregate"))
    return edges

sub_builder = (
    DAGBuilder("data-collection", "データ収集サブグラフ")
    .add_node(NodeDefinition("start", NodeType.TRANSFORM, handler=lambda s: s))
    .add_node(NodeDefinition("aggregate", NodeType.TRANSFORM, handler=lambda s: {"aggregated": "全データ統合"}))
    .set_entry("start")
)
sub_dag = sub_builder.build()

builder = (
    DAGBuilder("main-workflow", "メインワークフロー")
    .add_node(NodeDefinition("plan", NodeType.LLM_CALL, handler=lambda s: {**s, "data_sources": ["api", "database", "file"]}))
    .add_node(create_sub_workflow_node("collect", sub_dag))
    .add_node(NodeDefinition("report", NodeType.LLM_CALL, handler=lambda s: {"report": "最終レポート"}))
    .add_edge("plan", "collect")
    .add_edge("collect", "report")
    .set_entry("plan")
)

main_dag = builder.build()
scheduler = ConditionalDAGScheduler(main_dag)
result = await scheduler.execute({"input": "データ収集レポートを生成"})

パターン7:本番監視とアラート

AIエージェントワークフローDAGエンジンが本番稼働すると、監視が運用の生命線となる。各ノードの実行時間、成功率、エラー分布を把握する必要がある。

メトリクスコレクター

from collections import defaultdict
import statistics


@dataclass
class NodeMetrics:
    node_id: str
    total_executions: int = 0
    success_count: int = 0
    failure_count: int = 0
    fallback_count: int = 0
    total_retry_count: int = 0
    execution_times: list[float] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        if self.total_executions == 0:
            return 0.0
        return self.success_count / self.total_executions

    @property
    def avg_execution_time(self) -> float:
        if not self.execution_times:
            return 0.0
        return statistics.mean(self.execution_times)

    @property
    def p95_execution_time(self) -> float:
        if len(self.execution_times) < 2:
            return self.avg_execution_time
        sorted_times = sorted(self.execution_times)
        idx = int(len(sorted_times) * 0.95)
        return sorted_times[min(idx, len(sorted_times) - 1)]

    @property
    def p99_execution_time(self) -> float:
        if len(self.execution_times) < 2:
            return self.avg_execution_time
        sorted_times = sorted(self.execution_times)
        idx = int(len(sorted_times) * 0.99)
        return sorted_times[min(idx, len(sorted_times) - 1)]


class MetricsCollector:
    def __init__(self):
        self._node_metrics: dict[str, NodeMetrics] = defaultdict(
            lambda: NodeMetrics(node_id="")
        )
        self._workflow_count = 0
        self._workflow_success = 0
        self._workflow_failure = 0

    def record_node_result(self, result: NodeResult):
        metrics = self._node_metrics[result.node_id]
        metrics.node_id = result.node_id
        metrics.total_executions += 1
        metrics.total_retry_count += result.retry_count

        if result.status == "completed":
            metrics.success_count += 1
        elif result.status == "completed_with_fallback":
            metrics.fallback_count += 1
            metrics.success_count += 1
        else:
            metrics.failure_count += 1

        exec_time = result.end_time - result.start_time
        if exec_time > 0:
            metrics.execution_times.append(exec_time)

    def record_workflow_result(self, result: WorkflowResult):
        self._workflow_count += 1
        if result.status == "completed":
            self._workflow_success += 1
        else:
            self._workflow_failure += 1
        for node_result in result.node_results.values():
            self.record_node_result(node_result)

    def get_node_metrics(self, node_id: str) -> NodeMetrics | None:
        return self._node_metrics.get(node_id)

    def get_all_metrics(self) -> dict[str, NodeMetrics]:
        return dict(self._node_metrics)

    def summary(self) -> dict:
        return {
            "total_workflows": self._workflow_count,
            "success_workflows": self._workflow_success,
            "failed_workflows": self._workflow_failure,
            "workflow_success_rate": (
                self._workflow_success / self._workflow_count
                if self._workflow_count > 0
                else 0.0
            ),
            "nodes": {
                nid: {
                    "success_rate": f"{m.success_rate:.2%}",
                    "avg_time": f"{m.avg_execution_time:.3f}s",
                    "p95_time": f"{m.p95_execution_time:.3f}s",
                    "p99_time": f"{m.p99_execution_time:.3f}s",
                    "total_retries": m.total_retry_count,
                    "fallback_count": m.fallback_count,
                }
                for nid, m in self._node_metrics.items()
            },
        }

アラートルール

class AlertRule:
    def __init__(
        self,
        name: str,
        condition: Callable[[NodeMetrics], bool],
        severity: str = "warning",
        message_template: str = "",
    ):
        self.name = name
        self.condition = condition
        self.severity = severity
        self.message_template = message_template

    def check(self, metrics: NodeMetrics) -> str | None:
        if self.condition(metrics):
            return self.message_template.format(
                node_id=metrics.node_id,
                success_rate=f"{metrics.success_rate:.2%}",
                avg_time=f"{metrics.avg_execution_time:.3f}s",
            )
        return None


class AlertManager:
    def __init__(self):
        self._rules: list[AlertRule] = []
        self._alerts: list[dict] = []

    def add_rule(self, rule: AlertRule):
        self._rules.append(rule)

    def check_metrics(self, metrics_collector: MetricsCollector):
        for node_id, metrics in metrics_collector.get_all_metrics().items():
            for rule in self._rules:
                alert_msg = rule.check(metrics)
                if alert_msg:
                    self._alerts.append({
                        "rule": rule.name,
                        "severity": rule.severity,
                        "node_id": node_id,
                        "message": alert_msg,
                        "timestamp": datetime.now().isoformat(),
                    })

    def get_alerts(self, severity: str | None = None) -> list[dict]:
        if severity:
            return [a for a in self._alerts if a["severity"] == severity]
        return list(self._alerts)


alert_mgr = AlertManager()
alert_mgr.add_rule(AlertRule(
    name="low_success_rate",
    condition=lambda m: m.total_executions >= 5 and m.success_rate < 0.8,
    severity="critical",
    message_template="Node {node_id} success rate {success_rate} below 80%",
))
alert_mgr.add_rule(AlertRule(
    name="high_latency",
    condition=lambda m: m.avg_execution_time > 30.0,
    severity="warning",
    message_template="Node {node_id} avg execution time {avg_time} exceeds 30s",
))
alert_mgr.add_rule(AlertRule(
    name="high_retry_rate",
    condition=lambda m: m.total_executions > 0
    and m.total_retry_count / m.total_executions > 2.0,
    severity="warning",
    message_template="Node {node_id} has high retry rate, avg retries per execution > 2",
))

5つのよくある落とし穴と解決策

落とし穴1:条件エッジの循環検出漏れ

条件エッジは実行時にのみ有効化されるため、静的循環検出が実行時のループを見逃す可能性がある。

def validate_conditional_cycles(dag: DAGDefinition):
    all_edges = list(dag.edges)
    for edge in all_edges:
        if edge.edge_type == EdgeType.CONDITIONAL:
            test_edges = [
                e for e in all_edges
                if not (e.source_id == edge.source_id
                        and e.target_id == edge.target_id
                        and e.edge_type == EdgeType.CONDITIONAL)
            ]
            test_edges.append(EdgeDefinition(
                source_id=edge.source_id,
                target_id=edge.target_id,
                edge_type=EdgeType.NORMAL,
            ))
            test_dag = DAGDefinition(
                workflow_id=dag.workflow_id + "-test",
                name=dag.name,
                nodes=dag.nodes,
                edges=test_edges,
                entry_node=dag.entry_node,
            )
            try:
                test_dag._check_cycle()
            except ValueError:
                raise ValueError(
                    f"Conditional edge '{edge.source_id}' → '{edge.target_id}' "
                    f"may create a runtime cycle"
                )

解決策:すべての条件エッジについて「有効化されたと仮定」の循環検出を行い、いかなる条件の組み合わせでもループが発生しないことを確認する。

落とし穴2:並列ノードの書き込み競合

複数の並列ノードが同時にstate内の同じキーを変更し、データが上書きされる。

def validate_parallel_write_safety(dag: DAGDefinition):
    levels = TopologicalSorter(dag).compute_levels()
    level_groups: dict[int, list[str]] = {}
    for nid, level in levels.items():
        level_groups.setdefault(level, []).append(nid)

    for level, nodes in level_groups.items():
        if len(nodes) <= 1:
            continue
        output_keys: dict[str, list[str]] = {}
        for nid in nodes:
            node = dag.nodes[nid]
            keys = node.metadata.get("output_keys", [])
            for key in keys:
                output_keys.setdefault(key, []).append(nid)

        conflicts = {k: v for k, v in output_keys.items() if len(v) > 1}
        if conflicts:
            raise ValueError(
                f"Parallel write conflict at level {level}: {conflicts}"
            )

解決策:DAG検証時に並列ノードの出力キーの競合を確認するか、名前空間で分離する。

落とし穴3:チェックポイントのシリアライズ失敗

stateにシリアライズ不可能なオブジェクト(DB接続、ファイルハンドルなど)が含まれ、チェックポイント保存が失敗する。

import pickle

def safe_serialize_state(state: dict) -> bytes:
    try:
        return pickle.dumps(state)
    except (pickle.PicklingError, TypeError) as e:
        clean_state = {}
        for key, value in state.items():
            try:
                pickle.dumps(value)
                clean_state[key] = value
            except (pickle.PicklingError, TypeError):
                clean_state[key] = f"<non-serializable: {type(value).__name__}>"
        return pickle.dumps(clean_state)

解決策:ハンドラーからはJSONシリアライズ可能なデータのみを返すか、カスタムシリアライザーを使用する。

落とし穴4:条件ルーティングのマッチングなし

すべての条件エッジのconditionがFalseを返し、ワークフローが停止する。

def ensure_default_branch(dag: DAGDefinition) -> DAGDefinition:
    conditional_sources = set()
    for edge in dag.edges:
        if edge.edge_type == EdgeType.CONDITIONAL:
            conditional_sources.add(edge.source_id)

    builder = DAGBuilder(
        f"{dag.workflow_id}-safe", f"{dag.name} (safe)"
    )
    for node in dag.nodes.values():
        builder.add_node(node)

    for edge in dag.edges:
        builder.add_edge(
            edge.source_id, edge.target_id,
            edge.edge_type, edge.condition, edge.condition_name,
        )

    for source_id in conditional_sources:
        has_normal = any(
            e.source_id == source_id and e.edge_type == EdgeType.NORMAL
            for e in dag.edges
        )
        if not has_normal:
            builder.add_node(
                NodeDefinition(
                    f"{source_id}_default",
                    NodeType.TRANSFORM,
                    handler=lambda s: {"routed_to_default": True},
                )
            )
            builder.add_edge(source_id, f"{source_id}_default")

    builder.set_entry(dag.entry_node)
    return builder.build()

解決策:すべての条件ルーティングノードにデフォルトブランチを追加し、少なくとも1つのパスが常に実行可能であることを保証する。

落とし穴5:サブグラフの状態漏洩

サブワークフローが親ワークフローのstateを変更し、予期しない副作用を引き起こす。

def isolate_sub_workflow_state(
    parent_state: dict, sub_workflow_input_keys: list[str]
) -> tuple[dict, Callable[[dict], dict]]:
    isolated = {k: parent_state[k] for k in sub_workflow_input_keys if k in parent_state}

    def merge_back(sub_state: dict) -> dict:
        output_keys = set(sub_workflow_input_keys)
        return {k: v for k, v in sub_state.items() if k not in output_keys}

    return isolated, merge_back

解決策:サブワークフロー実行時に必要なキーのみを渡し、返却時は新しいキーのみをマージする。


10のよくあるエラートラブルシューティング

# エラーメッセージ 原因 解決策
1 Cycle detected in DAG ノード間に循環依存がある Edge定義を確認し、ループを形成するエッジを削除
2 Unreachable nodes detected エントリから到達可能なパスがない Edge接続の欠落を確認
3 Entry node not found set_entryで指定したノードが存在しない node_idのスペルを確認
4 Source/Target node not found add_edgeが存在しないノードを参照 add_nodeをadd_edgeの前に実行
5 Deadlock detected 条件ルーティングにマッチなし、デフォルトなし デフォルトブランチを追加または条件関数を確認
6 Node failed after N retries LLM APIが継続的にタイムアウトまたはエラー API Key、ネットワーク、フォールバック戦略を確認
7 Sub-workflow failed サブワークフロー内部ノードの失敗 サブワークフローのnode_resultsで特定
8 Checkpoint serialization error stateにシリアライズ不可能なオブジェクト ハンドラーはdict[str, Any]のみ返す
9 Parallel write conflict 並列ノードの出力キーが競合 名前空間で出力キーを分離
10 Runtime cycle via conditional edge 条件エッジが実行時にループを形成 validate_conditional_cyclesで確認

高度な最適化テクニック

1. 非同期プリフェッチ:次レベルのノード依存を事前ロード

class PrefetchScheduler(PersistentDAGScheduler):
    async def _run_from_start(self, initial_state=None):
        execution_id = f"exec-{int(time.time() * 1000)}"
        state = dict(initial_state or {})
        node_results: dict[str, NodeResult] = {}
        completed: set[str] = set()
        pending: set[str] = {self.dag.entry_node}
        start_time = time.time()

        while pending:
            ready = [
                nid for nid in pending
                if self._get_dependencies(nid).issubset(completed)
            ]
            if not ready:
                break

            prefetch_tasks = []
            for nid in ready:
                node = self.dag.nodes[nid]
                if node.node_type == NodeType.LLM_CALL:
                    prefetch_tasks.append(
                        asyncio.create_task(self._warmup_llm(nid))
                    )

            tasks = [
                self._execute_node(nid, state, node_results)
                for nid in ready
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            if prefetch_tasks:
                await asyncio.gather(*prefetch_tasks, return_exceptions=True)

            for i, result in enumerate(results):
                node_id = ready[i]
                if isinstance(result, Exception):
                    node_results[node_id] = NodeResult(
                        node_id=node_id, status="failed", error=str(result)
                    )
                    return WorkflowResult(
                        workflow_id=self.dag.workflow_id,
                        execution_id=execution_id,
                        status="failed",
                        state=state,
                        node_results=node_results,
                        total_time=time.time() - start_time,
                    )
                node_results[node_id] = result
                state.update(result.output)
                completed.add(node_id)
                pending.discard(node_id)

                next_nodes = self._router.resolve_next_nodes(node_id, state)
                for next_id in next_nodes:
                    if next_id not in completed:
                        pending.add(next_id)

        return WorkflowResult(
            workflow_id=self.dag.workflow_id,
            execution_id=execution_id,
            status="completed",
            state=state,
            node_results=node_results,
            total_time=time.time() - start_time,
        )

    async def _warmup_llm(self, node_id: str):
        logger.info(f"Warming up LLM connection for node '{node_id}'")
        await asyncio.sleep(0.01)

2. タイムアウトサーキットブレーカー:遅いノードがワークフロー全体を引きずらないようにする

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        half_open_max: int = 1,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max = half_open_max
        self._failure_count = 0
        self._last_failure_time: float = 0
        self._state = "closed"
        self._half_open_count = 0

    async def call(self, handler: Callable, state: dict) -> dict:
        if self._state == "open":
            if time.time() - self._last_failure_time > self.recovery_timeout:
                self._state = "half_open"
                self._half_open_count = 0
            else:
                raise RuntimeError("Circuit breaker is OPEN")

        try:
            result = await handler(state) if asyncio.iscoroutinefunction(handler) else await asyncio.to_thread(handler, state)
            if self._state == "half_open":
                self._half_open_count += 1
                if self._half_open_count >= self.half_open_max:
                    self._state = "closed"
                    self._failure_count = 0
            return result
        except Exception as e:
            self._failure_count += 1
            self._last_failure_time = time.time()
            if self._failure_count >= self.failure_threshold:
                self._state = "open"
            raise

3. DAG可視化:Mermaid図の自動生成

def dag_to_mermaid(dag: DAGDefinition) -> str:
    lines = ["graph TD"]
    for edge in dag.edges:
        style = ""
        if edge.edge_type == EdgeType.CONDITIONAL:
            style = f"|{edge.condition_name}|"
        lines.append(f"    {edge.source_id} -->{style} {edge.target_id}")

    for nid, node in dag.nodes.items():
        label = f"{nid}\\n({node.node_type.value})"
        lines.append(f"    {nid}[\"{label}\"]")

    return "\n".join(lines)

オンラインMermaidエディタを使ってDAG可視化図を直接レンダリングできる。


比較分析:カスタムDAG vs LangGraph vs Prefect

側面 カスタムDAGエンジン LangGraph Prefect
言語 Python(拡張可能) Python Python
DAG定義 宣言的Builder StateGraph Flow + Task
並列実行 ✅ 自動レベルベース ✅ asyncioベース ✅ ネイティブDask/Ray
条件ルーティング ✅ 条件エッジ ✅ conditional_edges ✅ branch
状態永続化 ✅ CheckpointManager ✅ Checkpointer ✅ Result + Storage
チェックポイントリカバリ ✅ ネイティブサポート ✅ 設定必要 ⚠️ 自前構築
エラー回復 ✅ リトライ+フォールバック ⚠️ 自前構築 ✅ ネイティブリトライ
LLM統合 ⚠️ 自前構築 ✅ LangChainエコシステム ⚠️ 自前構築
可視化 ✅ Mermaidエクスポート ✅ LangGraph Studio ✅ Prefect UI
学習曲線 中程度 中程度 低い
本番監視 ✅ カスタムMetrics ⚠️ LangSmith ✅ Prefect Cloud
動的DAG ✅ 実行時生成 ✅ Command ✅ 動的タスク
サブグラフネスト ✅ SubWorkflowNode ✅ Subgraph ⚠️ サブFlow
コミュニティ ❌ 自己保守 ✅ 活発 ✅ 活発
適用シナリオ 高度なカスタマイズ要件 LangChainユーザー 汎用タスクオーケストレーション

選定ガイド:

  • カスタムDAGエンジン:深いカスタマイズ、既存システムとの緊密な統合、極限のパフォーマンス要件
  • LangGraph:LangChainエコシステム内、迅速なプロトタイピング、LLMネイティブサポート重視
  • Prefect:汎用タスクオーケストレーション、LLM以外の混合ワークフロー、すぐに使えるUI

LangGraphマルチエージェント協調について詳しくは、Python LangGraphマルチエージェント協調実践を参照。エージェントメモリアーキテクチャについては、AIエージェントメモリアーキテクチャ設計を参照。エージェントツール使用については、Python AIエージェントツール使用ガイドを参照。


オンラインツール推奨

ツール 用途 リンク
JSONフォーマッター DAG定義JSONの表示・編集 /ja/json/format
Mermaidエディタ DAGワークフロー図の可視化 /ja/dev/mermaid
Curl→コード変換 API呼び出しコードの迅速生成 /ja/dev/curl-to-code

まとめ

AIエージェントワークフローDAGエンジンは、2026年の本番級AIシステムのコアインフラである。本記事では7つの本番パターンを網羅した:

  1. タスク定義と依存グラフ構築 — 型安全なDAG Builder、自動循環検出と到達性検証
  2. トポロジカルソートと並列スケジューリング — レベルベースの自動並列実行、asyncio並行制御
  3. 条件ルーティングと分岐マージ — 宣言的条件エッジ、実行時動的ルーティング
  4. エラー回復とリトライ戦略 — 指数バックオフリトライ、フォールバックハンドラー、サーキットブレーカー
  5. 状態永続化とチェックポイントリカバリ — チェックポイント機構、クラッシュ後のリカバリ
  6. 動的DAGとサブグラフネスト — 実行時サブタスク生成、サブワークフローのカプセル化と再利用
  7. 本番監視とアラート — ノードレベルのメトリクス収集、成功率/レイテンシ監視、アラートルール

コア原則:DAGはAIワークフローを「ハードコードパイプライン」から「宣言的オーケストレーション」へと進化させる——エージェントシステムがプロトタイプから本番へ移行するための必須の道である。

他のAIエージェント実践記事:

外部参考:

ブラウザローカルツールを無料で試す →

#AI Agent#DAG#工作流引擎#任务编排#LangGraph#并行执行#2026#AI与大数据