Python RAGのパフォーマンスが悪い?2026年RAGAS評価+6つの最適化戦略で精度を40%向上

AI与大数据

Python RAGのパフォーマンスが悪い?2026年RAGAS評価+6つの最適化戦略で精度を40%向上

RAGシステムをリリースしたのに、回答がいつも「もっともらしいが間違っている」?検索されたドキュメントが無関係で、生成された回答にハルシネーションが含まれ、ユーザーからの苦情が絶えない?これはRAGがダメなのではなく、体系的な評価と最適化ができていないだけです。2026年、RAGAS評価フレームワーク + 6つの最適化戦略で、RAGの精度を60%から85%以上に向上させることができます。


背景知識:RAG評価指標

RAGシステムには検索生成の2つの主要コンポーネントの評価が必要です。RAGASフレームワークはコア指標を定義しています:

指標 評価コンポーネント 意味 値の範囲
Context Precision 検索 検索結果における関連ドキュメントのランキング 0-1
Context Recall 検索 必要な情報が検索された割合 0-1
Faithfulness 生成 生成された回答と検索ドキュメントの整合性 0-1
Answer Relevancy 生成 回答と質問の関連性 0-1
Answer Similarity 生成 生成回答と参照回答の意味的類似度 0-1

問題分析:RAGパフォーマンス低下の6つの根本原因

根本原因 割合 現象
チャンク戦略の不適切 25% 重要な情報が切断または複数チャンクに分散
検索リコール率の低さ 20% 関連ドキュメントが検索されない
リランキングの欠如 15% 関連ドキュメントが下位にランク付け
クエリ表現の不適切 15% ユーザーの質問とドキュメントの表現が不一致
単一検索方式 10% ベクトル検索のみでキーワードの完全一致を見落とす
プロンプトエンジニアリングの不足 15% 生成プロンプトが検索内容を十分に活用していない

ステップ1:RAGASで評価ベースラインを構築

# rag_evaluation.py
from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
)
from datasets import Dataset

eval_data = {
    "question": [
        "ゼロトラストアーキテクチャとは?",
        "IstioはmTLSをどのように実装していますか?",
        "SPIFFE IDのフォーマットは?",
    ],
    "contexts": [
        ["ゼロトラストアーキテクチャは、決して信頼せず常に検証するというコア原則を持つセキュリティモデルです..."],
        ["IstioはEnvoyサイドカープロキシを通じてmTLS証明書の発行とローテーションを自動管理します..."],
        ["SPIFFE IDのフォーマットはspiffe://<trust domain>/<workload identifier>です..."],
    ],
    "answer": [
        "ゼロトラストアーキテクチャは、決して信頼せず常に検証するというコア原則を持つセキュリティモデルで、ネットワーク境界に依存しません。",
        "IstioはEnvoyサイドカープロキシを通じてmTLSを実装し、証明書の発行、配布、ローテーションを自動管理します。",
        "SPIFFE IDのフォーマットはspiffe://<trust domain>/<workload identifier>で、trust domainがトラストドメインを識別します。",
    ],
    "ground_truth": [
        "ゼロトラストアーキテクチャは、決して信頼せず常に検証するというコア原則を持つセキュリティモデルで、すべてのリクエストに認証と認可が必要です。",
        "IstioはEnvoyサイドカープロキシインジェクションを通じてmTLSを実装し、サービス間通信に双方向TLS暗号化と認証を自動提供します。",
        "SPIFFE IDのフォーマットはspiffe://<trust domain>/<workload identifier>で、trust domainがトラストドメインを識別し、pathがワークロードを識別します。",
    ],
}

dataset = Dataset.from_dict(eval_data)

result = evaluate(
    dataset,
    metrics=[context_precision, context_recall, faithfulness, answer_relevancy],
)

print(result)
# 出力例:
# {'context_precision': 0.75, 'context_recall': 0.68, 'faithfulness': 0.82, 'answer_relevancy': 0.79}

最適化戦略1:チャンク戦略の最適化

# chunk_optimization.py
from langchain.text_splitter import RecursiveCharacterTextSplitter, SemanticChunker
from langchain_community.embeddings import OpenAIEmbeddings

class SmartChunker:
    def __init__(self, embeddings=None):
        self.embeddings = embeddings or OpenAIEmbeddings()
        self.recursive_splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=50,
            separators=["\n\n", "\n", "。", ".", " ", ""],
        )

    def chunk_with_structure(self, document: str, metadata: dict = None) -> list:
        """構造化チャンキング:見出し階層を保持"""
        chunks = []
        sections = document.split("\n\n")

        current_context = ""
        for section in sections:
            lines = section.strip().split("\n")
            is_heading = any(line.startswith("#") for line in lines)

            if is_heading:
                current_context = section.strip()
                sub_chunks = self.recursive_splitter.split_text(section)
                for chunk in sub_chunks:
                    chunks.append({
                        "content": chunk,
                        "context": current_context,
                        "metadata": metadata or {},
                    })
            else:
                sub_chunks = self.recursive_splitter.split_text(section)
                for chunk in sub_chunks:
                    chunks.append({
                        "content": chunk,
                        "context": current_context,
                        "metadata": metadata or {},
                    })

        return chunks

    def chunk_with_parent_child(self, document: str) -> list:
        """親子チャンキング:大チャンクは検索用、小チャンクは生成用"""
        parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=100)
        child_splitter = RecursiveCharacterTextSplitter(chunk_size=256, chunk_overlap=30)

        result = []
        parents = parent_splitter.split_text(document)

        for i, parent in enumerate(parents):
            children = child_splitter.split_text(parent)
            result.append({
                "parent_id": f"parent_{i}",
                "parent_content": parent,
                "children": [{"child_id": f"parent_{i}_child_{j}", "content": c} for j, c in enumerate(children)],
            })

        return result


chunker = SmartChunker()
doc = open("knowledge_base.md").read()
structured_chunks = chunker.chunk_with_structure(doc)
parent_child_chunks = chunker.chunk_with_parent_child(doc)

print(f"構造化チャンキング: {len(structured_chunks)} chunks")
print(f"親子チャンキング: {len(parent_child_chunks)} parent chunks")

最適化戦略2:検索リランキング

# reranker.py
from sentence_transformers import CrossEncoder
from typing import List, Dict

class Reranker:
    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
        self.model = CrossEncoder(model_name)

    def rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int = 5,
        content_key: str = "content",
    ) -> List[Dict]:
        """検索結果のリランキング"""
        pairs = [(query, doc[content_key]) for doc in documents]
        scores = self.model.predict(pairs)

        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for doc, score in scored_docs[:top_k]:
            result = doc.copy()
            result["rerank_score"] = float(score)
            results.append(result)

        return results


reranker = Reranker()

query = "Istio mTLSの設定方法は?"
initial_results = [
    {"content": "Istioインストールガイド...", "score": 0.85},
    {"content": "mTLS設定手順:1. PeerAuthenticationを作成...", "score": 0.72},
    {"content": "Istioトラフィック管理の概要...", "score": 0.68},
]

reranked = reranker.rerank(query, initial_results, top_k=3)
for r in reranked:
    print(f"Score: {r['rerank_score']:.4f} | {r['content'][:50]}")

最適化戦略3:ハイブリッド検索

# hybrid_retriever.py
from typing import List, Dict
import numpy as np

class HybridRetriever:
    def __init__(self, vector_store, keyword_store, alpha: float = 0.7):
        self.vector_store = vector_store
        self.keyword_store = keyword_store
        self.alpha = alpha

    def search(self, query: str, top_k: int = 10) -> List[Dict]:
        """ハイブリッド検索:ベクトル検索 + キーワード検索"""
        vector_results = self.vector_store.similarity_search_with_score(query, k=top_k * 2)
        keyword_results = self.keyword_store.search(query, top_k=top_k * 2)

        vector_scores = {}
        for doc, score in vector_results:
            doc_id = doc.metadata.get("id", hash(doc.page_content))
            vector_scores[doc_id] = {
                "content": doc.page_content,
                "vector_score": 1.0 / (1.0 + score),
                "keyword_score": 0.0,
            }

        for doc in keyword_results:
            doc_id = doc.metadata.get("id", hash(doc.page_content))
            if doc_id in vector_scores:
                vector_scores[doc_id]["keyword_score"] = doc.score
            else:
                vector_scores[doc_id] = {
                    "content": doc.page_content,
                    "vector_score": 0.0,
                    "keyword_score": doc.score,
                }

        combined = []
        for doc_id, scores in vector_scores.items():
            hybrid_score = (
                self.alpha * scores["vector_score"]
                + (1 - self.alpha) * scores["keyword_score"]
            )
            combined.append({
                "content": scores["content"],
                "hybrid_score": hybrid_score,
                "vector_score": scores["vector_score"],
                "keyword_score": scores["keyword_score"],
            })

        combined.sort(key=lambda x: x["hybrid_score"], reverse=True)
        return combined[:top_k]

最適化戦略4:クエリ書き換え

# query_rewriter.py
from openai import OpenAI

class QueryRewriter:
    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = OpenAI()
        self.model = model

    def rewrite(self, query: str, history: List[Dict] = None) -> str:
        """ユーザークエリを検索に適した形に書き換え"""
        messages = [
            {"role": "system", "content": """あなたはクエリ書き換えの専門家です。ユーザーの自然言語の質問を、ドキュメント検索に適した形に書き換えてください:
1. 省略されたコンテキストを補完(会話履歴から)
2. 口語表現を専門用語に変換
3. 複合質問をサブ質問に分解
4. 元の質問のコア意図を保持
書き換え後のクエリのみを出力し、説明は不要です。"""},
        ]

        if history:
            for msg in history[-3:]:
                messages.append({"role": msg["role"], "content": msg["content"]})

        messages.append({"role": "user", "content": f"元のクエリ: {query}\n書き換え後:"})

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.1,
            max_tokens=200,
        )

        return response.choices[0].message.content.strip()

    def expand_queries(self, query: str, n: int = 3) -> List[str]:
        """複数の検索バリアントを生成"""
        messages = [
            {"role": "system", "content": f"以下のクエリに対して{n}つの異なる角度からの検索バリアントを生成してください。1行に1つずつ。"},
            {"role": "user", "content": query},
        ]

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.5,
            max_tokens=300,
        )

        variants = response.choices[0].message.content.strip().split("\n")
        return [query] + [v.strip() for v in variants if v.strip()]


rewriter = QueryRewriter()
rewritten = rewriter.rewrite("あの相互認証の設定どうやるの?")
print(f"書き換え後: {rewritten}")

expanded = rewriter.expand_queries("Istio mTLS設定", n=3)
print(f"クエリバリアント: {expanded}")

最適化戦略5:コンテキスト圧縮

# context_compressor.py
from typing import List, Dict
from openai import OpenAI

class ContextCompressor:
    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = OpenAI()
        self.model = model

    def compress(self, query: str, documents: List[str], max_tokens: int = 2000) -> str:
        """検索されたドキュメントを圧縮し、クエリに関連する部分のみを保持"""
        combined = "\n\n---\n\n".join(documents)

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": f"""以下のドキュメントからクエリに最も関連する内容を抽出してください。
要件:
1. 直接関連する情報のみを保持
2. 冗長性と重複を排除
3. 事実の正確性を維持
4. 出力は{max_tokens}トークン以下"""},
                {"role": "user", "content": f"クエリ: {query}\n\nドキュメント:\n{combined}"},
            ],
            temperature=0.0,
            max_tokens=max_tokens,
        )

        return response.choices[0].message.content

最適化戦略6:アダプティブ検索

# adaptive_retrieval.py
class AdaptiveRetriever:
    def __init__(self, hybrid_retriever, reranker, query_rewriter):
        self.hybrid_retriever = hybrid_retriever
        self.reranker = reranker
        self.query_rewriter = query_rewriter
        self.retrieval_stats = {"simple": 0, "expanded": 0, "rewritten": 0}

    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        """アダプティブ検索戦略の選択"""
        rewritten_query = self.query_rewriter.rewrite(query)

        if rewritten_query.lower() == query.lower():
            results = self.hybrid_retriever.search(query, top_k=top_k * 2)
            self.retrieval_stats["simple"] += 1
        else:
            expanded = self.query_rewriter.expand_queries(query, n=2)
            all_results = []
            for q in expanded:
                all_results.extend(self.hybrid_retriever.search(q, top_k=top_k))
            seen = set()
            results = []
            for r in all_results:
                key = hash(r["content"])
                if key not in seen:
                    seen.add(key)
                    results.append(r)
            self.retrieval_stats["expanded"] += 1

        reranked = self.reranker.rerank(rewritten_query, results, top_k=top_k)
        return reranked

よくある落とし穴

# 落とし穴 症状 解決策
1 チャンクサイズの一律適用 テーブル/コードが切断、意味が不完全 コンテンツタイプ別にchunk_sizeを使い分け:テキスト512、コード1024、テーブル256
2 埋め込みモデルとクエリの不一致 日本語クエリで英語ドキュメント、検索精度が低い 多言語埋め込みモデル bge-m3multilingual-e5-large を使用
3 リランキングモデルが遅い 検索+リランキングのレイテンシが2秒を超える 軽量リランカー bge-reranker-v2-m3 を使用、またはリランキング結果をキャッシュ
4 クエリ書き換えが意図から逸脱 書き換え後のクエリがユーザーのコア意図を喪失 temperature=0.1に設定、元のクエリをフォールバックとして保持
5 ハイブリッド検索のalpha値が固定 クエリタイプごとに最適なalphaが異なる 動的に調整:キーワードクエリalpha=0.3、セマンティッククエリalpha=0.8

エラートラブルシューティング

エラーメッセージ 原因 解決方法
ragas: openai API key not set OpenAI API Keyが未設定 OPENAI_API_KEY 環境変数を設定
CrossEncoder: model not found リランキングモデルが未ダウンロード huggingface-cli download BAAI/bge-reranker-v2-m3
ChromaDB: collection not found ベクトルDBコレクションが存在しない ドキュメント挿入前にコレクションを作成
Token limit exceeded 検索ドキュメントがLLMコンテキストウィンドウを超過 ContextCompressorで圧縮、またはtop_kを減少
CUDA out of memory 埋め込み/リランキングモデルのGPU OOM CPU推論を使用、またはより小さいモデルに切り替え
FAISS index not built ベクトルインデックスが未構築 まず index = faiss.IndexFlatIP(dimension)index.add(vectors)
JSON decode error in RAGAS 評価データフォーマットが不正 Datasetにquestion/contexts/answer/ground_truthの4列が含まれているか確認
RecursiveCharacterTextSplitter: empty chunk ドキュメントが空、またはセパレータの不一致 空チャンクフィルタリングを追加、ドキュメントエンコーディングを確認
OpenAI: rate limit exceeded APIレート制限を超過 リトライロジックとリクエスト間隔を追加
SentenceTransformer: SSL error HuggingFaceダウンロードがブロック HFミラーを設定 HF_ENDPOINT=https://hf-mirror.com

高度な最適化

1. 評価駆動の反復最適化

def optimization_loop(eval_dataset, strategies, max_iterations=5):
    """評価駆動の自動最適化ループ"""
    best_score = 0
    best_config = None

    for i in range(max_iterations):
        for strategy_name, strategy_fn in strategies.items():
            modified_dataset = strategy_fn(eval_dataset)
            result = evaluate(modified_dataset, metrics=[faithfulness, answer_relevancy])
            score = result['faithfulness'] * 0.5 + result['answer_relevancy'] * 0.5

            if score > best_score:
                best_score = score
                best_config = strategy_name
                print(f"イテレーション{i+1}: {strategy_name} スコア {score:.4f} (新しいベスト)")

    return best_config, best_score

2. マルチ粒度インデックス

インデックス粒度 チャンクサイズ 用途 検索方法
段落レベル 512 tokens 精密検索 ベクトル類似度
ドキュメントレベル 2000 tokens コンテキスト補完 親子検索
サマリーレベル 100 tokens 高速フィルタリング キーワードマッチング

3. A/Bテストフレームワーク

class RAGABTest:
    def __init__(self, variant_a, variant_b):
        self.variant_a = variant_a
        self.variant_b = variant_b
        self.results = {"a": [], "b": []}

    def run(self, queries: List[str], sample_ratio: float = 0.5):
        import random
        for query in queries:
            variant = "a" if random.random() < sample_ratio else "b"
            retriever = self.variant_a if variant == "a" else self.variant_b
            result = retriever.retrieve(query)
            self.results[variant].append({"query": query, "result": result})

    def compare(self):
        from ragas import evaluate
        score_a = evaluate(self._to_dataset(self.results["a"]))
        score_b = evaluate(self._to_dataset(self.results["b"]))
        return {"variant_a": score_a, "variant_b": score_b}

比較分析

最適化戦略 精度向上 レイテンシ影響 実装複雑度 推奨優先度
チャンク最適化 +10-15% なし ★★★★★
検索リランキング +8-12% +50-200ms ★★★★★
ハイブリッド検索 +5-10% +20ms ★★★★
クエリ書き換え +5-8% +100ms ★★★★
コンテキスト圧縮 +3-5% +200ms ★★★
アダプティブ検索 +5-10% +100ms ★★★
RAG評価フレームワーク 指標カバレッジ 使いやすさ LLM依存 オープンソース
RAGAS ★★★★★ ★★★★ はい はい
DeepEval ★★★★ ★★★★★ はい はい
TruLens ★★★ ★★★ はい はい
ARES ★★★★ ★★★ はい はい

まとめ:RAG最適化は単一のパラメータ調整で済むものではありません。「評価→最適化→再評価」のクローズドループプロセスです。まずRAGASでベースラインを構築し、6つの戦略で個別に対応します:チャンク最適化で情報切断を解決、リランキングでTop-K精度を向上、ハイブリッド検索でリコールを拡大、クエリ書き換えでセマンティックギャップを埋める、コンテキスト圧縮でノイズを削減、アダプティブ検索で最適なルートを選択。各戦略は5-15%の精度向上に貢献し、組み合わせると40%以上になります。2026年、評価なしのRAG最適化は盲象に等しいです。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#Python#RAG#评估框架#RAGAS#检索增强#向量检索#Chunk优化#重排序