Python RAGナレッジグラフ:GraphRAGプロダクション実装の6つのコアパターン

AI与大数据

RAGの課題:純粋なベクトル検索ではなぜ不十分なのか?

慎重にチューニングしたRAGシステムに、ユーザーが「田中さんと佐藤さんはどのプロジェクトで協力しましたか?」と質問すると、二人の個人情報ばかり返ってきて、協力関係は全く見つからない。これは決して稀なケースではない——純粋なベクトルRAGは本質的にエンティティ間の構造化関係を失い、長文書のマルチホップ推論能力はほぼゼロで、ハルシネーション率も高い。

さらに深刻なのは、従来のRAGが文書をチャンクに分割してベクトル化するため、エンティティ関係が切り刻まれ、コンテキストが断片化されることだ。「Aの上司の上司は誰か」というマルチホップ質問には、ベクトル検索では全く対応できない。GraphRAGは、ナレッジグラフを通じてRAGに欠けていたピース——構造化関係推論——を補完する。


コア概念クイックリファレンス

概念 説明 代表的な実装
GraphRAG ナレッジグラフとベクトル検索を融合したRAGパラダイム Microsoft GraphRAG、LightRAG
ナレッジグラフ エンティティ-関係-エンティティのトリプルで構造化知識を格納 Neo4j、NebulaGraph
エンティティ抽出 非構造化テキストから固有表現を識別 LLM抽出、spaCy、GLiNER
関係抽出 エンティティ間の意味的関係を識別 LLM関係抽出、REモデル
コミュニティ検出 グラフ内の密なサブグラフ構造を発見 Leidenアルゴリズム、Louvainアルゴリズム
グラフ埋め込み グラフ構造をベクトル空間にマッピング Node2Vec、TransE、GAT
グラフトラバーサル 関係エッジに沿ったマルチホップクエリ Cypherクエリ、BFS/DFS

問題分析:GraphRAGの5つの主要課題

# 課題 具体的な現れ 影響
1 グラフ構築品質 LLMによるエンティティ関係抽出のノイズが多く、同義エンティティが統合されていない グラフの冗長ノードが多く、検索結果が混乱
2 エンティティ曖昧性解除 「りんご」は果物か会社か?同名エンティティを区別できない 関係の接続ミス、推論結果の偏り
3 グラフ更新・保守 増分更新で新旧関係が競合、フル再構築はコストが高い 知識の陳腐化、グラフとソースデータの乖離
4 クエリプランニングの複雑さ ユーザーの自然言語をグラフクエリに変換、パスが不確定 クエリ失敗や無関係な結果の返却
5 グラフとベクトルの融合 グラフトラバーサル結果とベクトル検索結果のランキング・統合方法 融合戦略の不適切さが精度を逆に低下

これら5つの課題は密接に連鎖している:グラフ品質の低さが曖昧性解除を困難にし、曖昧性解除の失敗が更新保守の負担を増やし、クエリプランニングは高品質なグラフに依存し、融合戦略は前述の全工程に依存する。プロダクション級GraphRAGは、これらの課題を体系的に解決しなければならない。


ステップバイステップ実装:6つのコアパターン

パターン1:LLMベースのエンティティ関係抽出

GraphRAGの第一歩——非構造化テキストからエンティティと関係のトリプルを抽出。

from dataclasses import dataclass, field
from openai import OpenAI


@dataclass
class Triple:
    subject: str
    predicate: str
    object: str
    source_text: str = ""


class LLMTripleExtractor:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def extract(self, text: str) -> list[Triple]:
        prompt = (
            "以下のテキストからすべてのエンティティ関係トリプルを抽出してください。\n"
            "出力形式:1行に1つのトリプル、| で区切り、形式は エンティティ1|関係|エンティティ2\n"
            "要件:\n"
            "1. エンティティは正式名称を使用\n"
            "2. 関係は簡潔な動詞を使用\n"
            "3. 代名詞は実際のエンティティ名に置き換え\n\n"
            f"テキスト:{text}"
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.0,
            max_tokens=1000,
        )
        content = response.choices[0].message.content
        triples = []
        for line in content.strip().split("\n"):
            parts = [p.strip() for p in line.split("|")]
            if len(parts) == 3:
                triples.append(Triple(
                    subject=parts[0],
                    predicate=parts[1],
                    object=parts[2],
                    source_text=text,
                ))
        return triples

    def batch_extract(self, texts: list[str]) -> list[Triple]:
        all_triples = []
        for text in texts:
            all_triples.extend(self.extract(text))
        return all_triples


extractor = LLMTripleExtractor(api_key="your-api-key")
text = "田中さんはAI部門のテックリードで、スマートチャットボットプロジェクトを率いており、GPT-4モデルを使用している。"
triples = extractor.extract(text)
for t in triples:
    print(f"{t.subject} --[{t.predicate}]--> {t.object}")

適用シーン:文書ナレッジグラフ構築、エンタープライズ知識ベースの構造化。


パターン2:Neo4jグラフストレージとインデックス

抽出したトリプルをNeo4jに格納し、効率的なクエリのためのインデックスを構築。

from neo4j import GraphDatabase


class Neo4jGraphStore:
    def __init__(self, uri: str = "bolt://localhost:7687",
                 user: str = "neo4j", password: str = "password"):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        self._create_indexes()

    def _create_indexes(self) -> None:
        with self.driver.session() as session:
            session.run(
                "CREATE CONSTRAINT entity_name IF NOT EXISTS "
                "FOR (e:Entity) REQUIRE e.name IS UNIQUE"
            )
            session.run(
                "CREATE INDEX entity_type_idx IF NOT EXISTS "
                "FOR (e:Entity) ON (e.type)"
            )

    def add_triple(self, triple: Triple) -> None:
        with self.driver.session() as session:
            session.run(
                "MERGE (s:Entity {name: $subject}) "
                "ON CREATE SET s.type = 'unknown' "
                "MERGE (o:Entity {name: $object}) "
                "ON CREATE SET o.type = 'unknown' "
                "MERGE (s)-[r:RELATED {predicate: $predicate}]->(o) "
                "ON CREATE SET r.source = $source",
                subject=triple.subject,
                object=triple.object,
                predicate=triple.predicate,
                source=triple.source_text[:200],
            )

    def add_triples_batch(self, triples: list[Triple]) -> None:
        with self.driver.session() as session:
            for triple in triples:
                session.run(
                    "MERGE (s:Entity {name: $subject}) "
                    "MERGE (o:Entity {name: $object}) "
                    "MERGE (s)-[r:RELATED {predicate: $predicate}]->(o)",
                    subject=triple.subject,
                    object=triple.object,
                    predicate=triple.predicate,
                )

    def query_neighbors(self, entity_name: str,
                        depth: int = 1) -> list[dict]:
        with self.driver.session() as session:
            result = session.run(
                "MATCH path = (e:Entity {name: $name})-[:RELATED*1.."
                f"{depth}]-(neighbor) "
                "RETURN nodes(path) as nodes, "
                "relationships(path) as rels",
                name=entity_name,
            )
            return [record.data() for record in result]

    def search_by_predicate(self, predicate: str) -> list[dict]:
        with self.driver.session() as session:
            result = session.run(
                "MATCH (s)-[r:RELATED {predicate: $predicate}]->(o) "
                "RETURN s.name as subject, o.name as object",
                predicate=predicate,
            )
            return [record.data() for record in result]

    def close(self) -> None:
        self.driver.close()


store = Neo4jGraphStore()
store.add_triples_batch(triples)
neighbors = store.query_neighbors("田中", depth=2)
print(f"田中の2ホップ隣接: {neighbors}")
store.close()

適用シーン:大規模ナレッジグラフストレージ、マルチホップ関係クエリ。


パターン3:コミュニティ検出と要約生成

Leidenアルゴリズムでグラフコミュニティを発見し、各コミュニティの要約を生成してグローバルな質問に対応。

import community as community_louvain
import networkx as nx
from openai import OpenAI


class CommunityDetector:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def build_graph(self, triples: list[Triple]) -> nx.Graph:
        G = nx.Graph()
        for t in triples:
            G.add_node(t.subject)
            G.add_node(t.object)
            G.add_edge(t.subject, t.object, predicate=t.predicate)
        return G

    def detect_communities(self, G: nx.Graph,
                           resolution: float = 1.0) -> dict[int, list[str]]:
        partition = community_louvain.best_partition(
            G, resolution=resolution
        )
        communities: dict[int, list[str]] = {}
        for node, comm_id in partition.items():
            communities.setdefault(comm_id, []).append(node)
        return communities

    def summarize_community(self, G: nx.Graph,
                            members: list[str]) -> str:
        subgraph = G.subgraph(
            [n for n in members if n in G.nodes]
        )
        edges_info = []
        for u, v, data in subgraph.edges(data=True):
            edges_info.append(f"{u} -[{data.get('predicate', '')}]-> {v}")
        prompt = (
            "以下のナレッジグラフコミュニティの要約を生成し、"
            "コアテーマと主要な関係を概括してください:\n\n"
            f"エンティティ:{', '.join(members)}\n"
            f"関係:\n" + "\n".join(edges_info)
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=300,
        )
        return response.choices[0].message.content

    def process(self, triples: list[Triple]) -> dict[int, dict]:
        G = self.build_graph(triples)
        communities = self.detect_communities(G)
        results = {}
        for comm_id, members in communities.items():
            summary = self.summarize_community(G, members)
            results[comm_id] = {
                "members": members,
                "summary": summary,
                "size": len(members),
            }
        return results


detector = CommunityDetector(api_key="your-api-key")
community_results = detector.process(triples)
for cid, info in community_results.items():
    print(f"コミュニティ{cid}({info['size']}エンティティ): {info['summary'][:80]}")

適用シーン:グローバルな質問への回答、ナレッジグラフのテーマ分析。


パターン4:グラフトラバーサル検索拡張

ユーザー質問からエンティティを識別し、グラフ関係をトラバーサルしてコンテキストを取得、RAG検索を拡張。

import re


class GraphTraversalRetriever:
    def __init__(self, graph_store: Neo4jGraphStore,
                 max_depth: int = 2):
        self.store = graph_store
        self.max_depth = max_depth

    def extract_entities_from_query(self, query: str) -> list[str]:
        known_entities = self._get_all_entity_names()
        found = []
        for entity in known_entities:
            if entity in query:
                found.append(entity)
        return found

    def _get_all_entity_names(self) -> list[str]:
        with self.store.driver.session() as session:
            result = session.run("MATCH (e:Entity) RETURN e.name AS name")
            return [r["name"] for r in result]

    def retrieve(self, query: str) -> list[dict]:
        entities = self.extract_entities_from_query(query)
        if not entities:
            return []
        context = []
        for entity in entities:
            neighbors = self.store.query_neighbors(
                entity, depth=self.max_depth
            )
            context.append({
                "seed_entity": entity,
                "traversal_depth": self.max_depth,
                "subgraph": neighbors,
            })
        return context

    def format_context(self, results: list[dict]) -> str:
        parts = []
        for r in results:
            parts.append(f"エンティティ【{r['seed_entity']}】からの{r['traversal_depth']}ホップトラバーサル結果:")
            for record in r["subgraph"]:
                parts.append(f"  {record}")
        return "\n".join(parts)


retriever = GraphTraversalRetriever(store, max_depth=2)
results = retriever.retrieve("田中さんはどのプロジェクトで働いていますか?")
print(retriever.format_context(results))

適用シーン:マルチホップ関係クエリ、エンティティ中心の知識検索。


パターン5:グラフ・ベクトルハイブリッド検索

グラフトラバーサルとベクトル検索を融合し、双方の長所を活かしたより精度の高い知識リコールを実現。

import numpy as np
from dataclasses import dataclass


@dataclass
class HybridResult:
    content: str
    graph_score: float
    vector_score: float
    combined_score: float
    source: str


class HybridGraphVectorRetriever:
    def __init__(self, graph_store: Neo4jGraphStore,
                 vector_dim: int = 1536,
                 graph_weight: float = 0.6,
                 vector_weight: float = 0.4):
        self.graph_store = graph_store
        self.vector_dim = vector_dim
        self.graph_weight = graph_weight
        self.vector_weight = vector_weight
        self.doc_embeddings: dict[str, np.ndarray] = {}
        self.doc_contents: dict[str, str] = {}

    def add_document(self, doc_id: str, content: str,
                     embedding: np.ndarray) -> None:
        self.doc_embeddings[doc_id] = embedding
        self.doc_contents[doc_id] = content

    def vector_search(self, query_embedding: np.ndarray,
                      top_k: int = 5) -> list[tuple[str, float]]:
        scores = []
        for doc_id, emb in self.doc_embeddings.items():
            sim = float(np.dot(query_embedding, emb) /
                        (np.linalg.norm(query_embedding) *
                         np.linalg.norm(emb) + 1e-8))
            scores.append((doc_id, sim))
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]

    def hybrid_search(self, query: str,
                      query_embedding: np.ndarray,
                      top_k: int = 5) -> list[HybridResult]:
        graph_results = self._graph_search(query)
        vector_results = self.vector_search(query_embedding, top_k=top_k)
        combined = {}
        for doc_id, vec_score in vector_results:
            combined[doc_id] = {
                "vector_score": vec_score,
                "graph_score": 0.0,
                "content": self.doc_contents.get(doc_id, ""),
            }
        for item in graph_results:
            doc_id = item.get("doc_id", "")
            if doc_id in combined:
                combined[doc_id]["graph_score"] = item.get("score", 0.5)
            else:
                combined[doc_id] = {
                    "vector_score": 0.0,
                    "graph_score": item.get("score", 0.5),
                    "content": item.get("content", ""),
                }
        results = []
        for doc_id, scores in combined.items():
            combined_score = (
                self.graph_weight * scores["graph_score"] +
                self.vector_weight * scores["vector_score"]
            )
            results.append(HybridResult(
                content=scores["content"],
                graph_score=scores["graph_score"],
                vector_score=scores["vector_score"],
                combined_score=combined_score,
                source=doc_id,
            ))
        results.sort(key=lambda x: x.combined_score, reverse=True)
        return results[:top_k]

    def _graph_search(self, query: str) -> list[dict]:
        entities = []
        with self.graph_store.driver.session() as session:
            result = session.run("MATCH (e:Entity) RETURN e.name AS name")
            for r in result:
                if r["name"] in query:
                    entities.append(r["name"])
        graph_items = []
        for entity in entities:
            neighbors = self.graph_store.query_neighbors(entity, depth=1)
            graph_items.append({
                "doc_id": f"graph_{entity}",
                "score": 0.8,
                "content": str(neighbors)[:500],
            })
        return graph_items


hybrid = HybridGraphVectorRetriever(store, graph_weight=0.6, vector_weight=0.4)
hybrid.add_document("doc1", "田中さんはAI部門のスマートチャットボットプロジェクトを担当", np.random.randn(1536))
query_emb = np.random.randn(1536)
results = hybrid.hybrid_search("田中さんはどのプロジェクトを担当していますか?", query_emb, top_k=3)
for r in results:
    print(f"[G:{r.graph_score:.2f} V:{r.vector_score:.2f} C:{r.combined_score:.2f}] {r.content[:60]}")

適用シーン:構造化関係と意味的類似度の両方を活用する必要があるRAGシステム。


パターン6:エンドツーエンドGraphRAGパイプライン

上記のパターンを完全なパイプラインに連結:文書入力→エンティティ抽出→グラフストレージ→コミュニティ検出→ハイブリッド検索→回答生成。

from dataclasses import dataclass


@dataclass
class GraphRAGConfig:
    extraction_model: str = "gpt-4o-mini"
    community_resolution: float = 1.0
    traversal_depth: int = 2
    graph_weight: float = 0.6
    vector_weight: float = 0.4
    max_context_tokens: int = 3000


class GraphRAGPipeline:
    def __init__(self, neo4j_uri: str, neo4j_user: str,
                 neo4j_password: str, api_key: str,
                 config: GraphRAGConfig | None = None):
        self.config = config or GraphRAGConfig()
        self.extractor = LLMTripleExtractor(
            api_key=api_key, model=self.config.extraction_model
        )
        self.graph_store = Neo4jGraphStore(
            uri=neo4j_uri, user=neo4j_user, password=neo4j_password
        )
        self.community_detector = CommunityDetector(
            api_key=api_key, model=self.config.extraction_model
        )
        self.hybrid_retriever = HybridGraphVectorRetriever(
            self.graph_store,
            graph_weight=self.config.graph_weight,
            vector_weight=self.config.vector_weight,
        )
        self.client = OpenAI(api_key=api_key)
        self._community_summaries: dict[int, str] = {}

    def ingest(self, documents: list[str]) -> None:
        all_triples = self.extractor.batch_extract(documents)
        self.graph_store.add_triples_batch(all_triples)
        community_results = self.community_detector.process(all_triples)
        self._community_summaries = {
            cid: info["summary"]
            for cid, info in community_results.items()
        }

    def query(self, question: str,
              query_embedding: np.ndarray | None = None) -> str:
        context_parts = []
        graph_results = self.hybrid_retriever._graph_search(question)
        for item in graph_results:
            context_parts.append(item.get("content", ""))
        if query_embedding is not None:
            vector_results = self.hybrid_retriever.vector_search(
                query_embedding, top_k=3
            )
            for doc_id, score in vector_results:
                content = self.hybrid_retriever.doc_contents.get(doc_id, "")
                if content:
                    context_parts.append(content)
        for summary in self._community_summaries.values():
            if any(kw in summary for kw in question.split()):
                context_parts.append(f"[コミュニティ要約] {summary}")
        context = "\n\n".join(context_parts)
        if len(context) > self.config.max_context_tokens * 4:
            context = context[:self.config.max_context_tokens * 4]
        prompt = (
            "以下のナレッジグラフ検索結果に基づいて質問に答えてください。"
            "コンテキストに十分な情報がない場合は、明確にその旨を述べてください。\n\n"
            f"コンテキスト:\n{context}\n\n質問:{question}"
        )
        response = self.client.chat.completions.create(
            model=self.config.extraction_model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        return response.choices[0].message.content

    def close(self) -> None:
        self.graph_store.close()


pipeline = GraphRAGPipeline(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
    api_key="your-api-key",
)
pipeline.ingest([
    "田中さんはAI部門のテックリードで、スマートチャットボットプロジェクトをGPT-4で率いている。",
    "佐藤さんはデータ部門の責任者で、SparkとFlinkを使ったデータプラットフォームを担当している。",
    "スマートチャットボットプロジェクトとデータプラットフォームはユーザープロファイリングモジュールで協力している。",
])
answer = pipeline.query("田中さんと佐藤さんはどのモジュールで協力していますか?")
print(answer)
pipeline.close()

適用シーン:エンタープライズナレッジベースQ&A、マルチホップ関係推論、グローバルな質問分析。


よくある落とし穴:5つの罠

罠1:エンティティ抽出で重複排除と正規化を行わない

誤ったアプローチ

def extract_and_store(text: str):
    triples = extractor.extract(text)
    for t in triples:
        store.add_triple(t)

正しいアプローチ

ENTITY_ALIASES = {"AI部門": "人工知能部", "GPT-4": "GPT-4", "田中": "田中"}

def normalize_entity(name: str) -> str:
    return ENTITY_ALIASES.get(name, name)

def extract_and_store(text: str):
    triples = extractor.extract(text)
    for t in triples:
        t.subject = normalize_entity(t.subject)
        t.object = normalize_entity(t.object)
        store.add_triple(t)

罠2:グラフトラバーサルに深さ制限を設けない

誤ったアプローチ

result = session.run(
    "MATCH path = (e:Entity {name: $name})-[:RELATED*]-(n) RETURN path",
    name=entity_name,
)

正しいアプローチ

MAX_DEPTH = 3

result = session.run(
    f"MATCH path = (e:Entity {{name: $name}})-[:RELATED*1..{MAX_DEPTH}]-(n) "
    "RETURN path LIMIT 50",
    name=entity_name,
)

罠3:コミュニティ要約をキャッシュせず、毎回再生成する

誤ったアプローチ

def get_community_summary(community_id: int) -> str:
    return detector.summarize_community(G, members)

正しいアプローチ

from functools import lru_cache

@lru_cache(maxsize=128)
def get_community_summary(community_id: int) -> str:
    return detector.summarize_community(G, members)

罠4:ハイブリッド検索の重みをハードコードで固定する

誤ったアプローチ

combined = 0.5 * graph_score + 0.5 * vector_score

正しいアプローチ

def adaptive_weights(query: str) -> tuple[float, float]:
    entity_count = count_entities_in_query(query)
    if entity_count >= 2:
        return 0.7, 0.3
    elif entity_count == 1:
        return 0.5, 0.5
    else:
        return 0.3, 0.7

gw, vw = adaptive_weights(query)
combined = gw * graph_score + vw * vector_score

罠5:グラフの増分更新で競合検出を行わない

誤ったアプローチ

def update_triple(triple: Triple):
    session.run("MERGE (s)-[r:RELATED {predicate: $p}]->(o)", ...)

正しいアプローチ

def update_triple(triple: Triple):
    existing = session.run(
        "MATCH (s:Entity {name: $subj})-[r:RELATED]->(o:Entity {name: $obj}) "
        "RETURN r.predicate AS pred, r.version AS ver",
        subj=triple.subject, obj=triple.object,
    ).data()
    if existing and existing[0]["pred"] != triple.predicate:
        session.run(
            "MATCH (s:Entity {name: $subj})-[r:RELATED]->(o:Entity {name: $obj}) "
            "SET r.predicate = $pred, r.version = r.version + 1, "
            "r.updated_at = datetime()",
            subj=triple.subject, obj=triple.object, pred=triple.predicate,
        )
    else:
        session.run("MERGE (s)-[r:RELATED {predicate: $p}]->(o)", ...)

エラートラブルシューティング:10のよくあるエラー

# エラーメッセージ 原因 解決策
1 Neo4j connection refused Neo4jサービスが未起動またはポート設定エラー Dockerコンテナの状態を確認、boltポート7687を確認
2 Constraint violation: Entity name already exists 同名エンティティの重複挿入でプロパティが競合 MERGEをCREATEの代わりに使用、または更新前に照会
3 LLM extraction returns empty triples プロンプト設計の不備またはテキストが短すぎる 抽出プロンプトを最適化、入力テキストを200-500文字に制限
4 Community detection returns single community グラフのエッジが少ない、またはresolutionパラメータが不適切 トリプル数を増やす、resolution値を下げる
5 Graph traversal timeout トラバーサル深度が大きすぎる、またはスーパーノードが存在 深度を≤3に制限、LIMIT句を追加
6 Embedding dimension mismatch in hybrid search グラフ埋め込みとテキスト埋め込みの次元が異なる 次元を統一、または投影層でアライメント
7 Memory exceeded during batch ingestion 大量のトリプル一括書き込みでメモリ不足 バッチ書き込み、1バッチあたり1000件以下
8 Circular reference in graph エンティティ関係が循環しトラバーサルが無限ループに visitedセットで重複アクセスを防止
9 Community summary hallucination LLMがコミュニティ要約でハルシネーションを生成 プロンプトで「与えられた関係のみから生成」を強調
10 Hybrid search returns no results グラフトラバーサルとベクトル検索の両方でマッチなし 類似度閾値を緩和、フォールバック全文検索を追加

高度な最適化:4つのキーテクニック

1. エンティティ曖昧性解除とアライメント

from difflib import SequenceMatcher


class EntityDisambiguator:
    def __init__(self, similarity_threshold: float = 0.85):
        self.threshold = similarity_threshold

    def find_canonical(self, name: str,
                       known_entities: list[str]) -> str | None:
        best_match = None
        best_score = 0.0
        for entity in known_entities:
            score = SequenceMatcher(None, name, entity).ratio()
            if score > best_score and score >= self.threshold:
                best_score = score
                best_match = entity
        return best_match

    def disambiguate(self, entities: list[str]) -> dict[str, str]:
        canonical_map = {}
        unique = []
        for entity in entities:
            match = self.find_canonical(entity, unique)
            if match:
                canonical_map[entity] = match
            else:
                unique.append(entity)
                canonical_map[entity] = entity
        return canonical_map

2. 増分グラフ更新戦略

class IncrementalGraphUpdater:
    def __init__(self, graph_store: Neo4jGraphStore):
        self.store = graph_store

    def update_with_diff(self, new_triples: list[Triple],
                         existing_triples: list[Triple]) -> dict:
        existing_set = {
            (t.subject, t.predicate, t.object) for t in existing_triples
        }
        added, updated, skipped = [], [], []
        for triple in new_triples:
            key = (triple.subject, triple.predicate, triple.object)
            if key not in existing_set:
                self.store.add_triple(triple)
                added.append(key)
            else:
                skipped.append(key)
        return {"added": len(added), "updated": len(updated), "skipped": len(skipped)}

3. クエリルーティング:検索戦略の自動選択

class QueryRouter:
    def __init__(self):
        self.entity_patterns = [
            r"(.+?)と(.+?)の(.+?)", r"(.+?)の(.+?)は誰",
            r"(.+?)は(.+?)に所属", r"(.+?)は(.+?)に参加",
        ]

    def route(self, query: str) -> str:
        import re
        for pattern in self.entity_patterns:
            if re.search(pattern, query):
                return "graph"
        if len(query) > 50 or "まとめ" in query or "概要" in query:
            return "community"
        return "vector"

4. グラフ品質モニタリング

class GraphQualityMonitor:
    def __init__(self, graph_store: Neo4jGraphStore):
        self.store = graph_store

    def get_stats(self) -> dict:
        with self.store.driver.session() as session:
            node_count = session.run(
                "MATCH (n) RETURN count(n) AS count"
            ).single()["count"]
            edge_count = session.run(
                "MATCH ()-[r]->() RETURN count(r) AS count"
            ).single()["count"]
            isolated = session.run(
                "MATCH (n) WHERE NOT (n)--() RETURN count(n) AS count"
            ).single()["count"]
            avg_degree = (2 * edge_count / node_count) if node_count else 0
        return {
            "node_count": node_count,
            "edge_count": edge_count,
            "isolated_nodes": isolated,
            "avg_degree": round(avg_degree, 2),
            "edge_node_ratio": round(edge_count / node_count, 2) if node_count else 0,
        }

比較分析:4つのRAGアプローチ

次元 純粋ベクトルRAG GraphRAG ハイブリッドRAG 従来の検索
マルチホップ推論
意味理解 ★★★★ ★★★ ★★★★★ ★★
精密マッチング ★★ ★★★★ ★★★★ ★★★★★
グローバルな質問 ✓(コミュニティ要約)
構築コスト
クエリレイテンシ ~100ms ~200ms ~300ms ~50ms
保守の複雑さ
ハルシネーション率
典型的なユースケース 意味検索 関係推論 総合Q&A 精密検索

★が多いほどその次元で優れた性能;✓対応 △部分的対応 ✗非対応


まとめと展望

GraphRAGは2026年のRAGシステムアップグレードの重要な方向性になりつつある:

  1. 軽量GraphRAG:LightRAGなどのフレームワークがグラフ構築のハードルを下げ、Neo4jなしでも実行可能に
  2. 動的グラフ更新:ストリーミングエンティティ抽出+増分グラフ更新で、ナレッジグラフがリアルタイムに進化
  3. マルチモーダルナレッジグラフ:画像、テーブル、コードもエンティティとしてグラフに組み込み
  4. 適応型検索ルーティング:質問タイプに応じてベクトル/グラフ/ハイブリッド検索を自動選択
  5. GraphRAG評価の標準化:GraphRAG-Benchなどのベンチマークが体系的な比較を推進

GraphRAG選択の原則:まず本当にグラフが必要かを評価する。Q&Aシナリオが主にシングルホップの意味検索であれば、純粋なベクトルRAGで十分;マルチホップ関係推論が必須要件のときだけGraphRAGへの投資が価値がある。まずNetworkX+ローカルファイルで効果を検証し、証明されてからNeo4jに移行するのが推奨アプローチだ。


オンラインツール推薦

  • JSONフォーマッター — ナレッジグラフのトリプルと検索結果のJSON構造をフォーマット
  • ハッシュ計算 — エンティティ重複排除とトリプルフィンガープリントのMD5/SHAハッシュを計算
  • Curl→コード変換 — Neo4j APIとLLMインターフェースのデバッグcurlをPythonコードに変換

ブラウザローカルツールを無料で試す →

#RAG知识图谱#GraphRAG#Neo4j#向量检索#知识增强#Python#2026#AI与大数据