PostgreSQL pgvector RAG実践:エンベディング保存からハイブリッド検索までの7つのプロダクションパターン

数据库

チームがRAGの導入を決めた途端、アーキテクトがPineconeの請求書を提示——月額$2000から。さらに厄介なのは、ビジネスデータはPostgreSQLに、ベクトルデータはPineconeにあり、デュアルライトの整合性を誰が保証するのかという問題。2026年、pgvectorは「使える」から「優秀」へ進化:HNSWインデックスのパフォーマンス向上、halfvec半精度保存、ネイティブハイブリッド検索……既存のPostgreSQLこそが最良のベクトルデータベースです。


主要な収穫

  • pgvector RAGの7つのプロダクションパターンを習得、テーブル設計からハイブリッド検索まで完全網羅
  • HNSWとIVFFlatインデックスの選択戦略とパラメータチューニングを理解
  • ベクトル+全文+キーワードの3ウェイハイブリッド検索とRRFリランキングを実装
  • マルチテナント分離、パフォーマンスチューニング、バックアップリカバリなどの本番課題を解決
  • そのまま使える完全なPython RAGコードを取得

目次

  1. pgvectorコア概念
  2. パターン1:エンベディング保存とテーブル設計
  3. パターン2:HNSWインデックス最適化
  4. パターン3:ハイブリッド検索(ベクトル+全文+キーワード)
  5. パターン4:リランキングと結果融合
  6. パターン5:マルチテナントデータ分離
  7. パターン6:パフォーマンスチューニングとクエリ最適化
  8. パターン7:本番デプロイとバックアップリカバリ
  9. 5つのよくある落とし穴と解決策
  10. 10のよくあるエラートラブルシューティング
  11. 高度な最適化テクニック
  12. 比較分析:pgvector vs Pinecone vs Milvus vs Qdrant
  13. オンラインツールおすすめ

pgvectorコア概念

RAGとベクトル検索の関係

RAG(Retrieval-Augmented Generation)のコアフロー:ユーザーが質問 → ベクトル検索で関連ドキュメントを発見 → ドキュメントをコンテキストとしてLLMに提供 → 回答を生成。ベクトル検索はRAGの基盤であり、pgvectorがPostgreSQLにこの能力をもたらします。

┌─────────────┐     ┌──────────────┐     ┌──────────────┐
│  User Query  │────▶│  Embedding   │────▶│  pgvector    │
│  ユーザー質問 │     │  Model       │     │  類似度検索   │
└─────────────┘     └──────────────┘     └──────┬───────┘
                                                 │
                                                 ▼
┌─────────────┐     ┌──────────────┐     ┌──────────────┐
│  LLM Answer  │◀────│  Context +   │◀────│  Top-K Docs  │
│  回答生成     │     │  Prompt      │     │  関連ドキュメント│
└─────────────┘     └──────────────┘     └──────────────┘

pgvector距離メトリクス

メトリクス 演算子 インデックス演算子クラス ユースケース 値域
コサイン距離 <=> vector_cosine_ops 意味的類似度(最も一般的) [0, 2]
ユークリッド距離 <-> vector_l2_ops 空間距離 [0, +∞)
内積 <#> vector_ip_ops 正規化ベクトル (-∞, +∞)

pgvectorバージョン履歴

バージョン リリース 主な機能
0.5 2023 HNSWインデックス、半精度ベクトル
0.6 2024 並列インデックス構築、スパースベクトル
0.7 2024 CONCURRENTLYサポート、halfvecインデックス
0.8 2025 SPARSEVEC型、量子化サポート
0.9 2026 HNSW構築速度改善、マルチコア並列

パターン1:エンベディング保存とテーブル設計

基本テーブル構造

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
    id BIGSERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    source TEXT,
    category TEXT,
    tags TEXT[],
    chunk_index INTEGER DEFAULT 0,
    token_count INTEGER,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_category ON documents(category);
CREATE INDEX idx_documents_created ON documents(created_at DESC);

チャンク保存設計

RAGシステムでは、長いドキュメントをチャンクに分割し、それぞれにエンベディングを生成します:

CREATE TABLE document_chunks (
    id BIGSERIAL PRIMARY KEY,
    document_id BIGINT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content TEXT NOT NULL,
    token_count INTEGER,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_embedding ON document_chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

メタデータ+ベクトル統合テーブル

CREATE TABLE knowledge_base (
    id BIGSERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL,
    doc_type VARCHAR(50),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    summary TEXT,
    keywords TEXT[],
    published_at DATE,
    access_level INTEGER DEFAULT 0,
    embedding vector(1536),
    tsv tsvector GENERATED ALWAYS AS (
        to_tsvector('english',
            coalesce(title, '') || ' ' ||
            coalesce(content, '') || ' ' ||
            coalesce(array_to_string(keywords, ' '), '')
        )
    ) STORED,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_kb_tsv ON knowledge_base USING gin(tsv);
CREATE INDEX idx_kb_tenant_type ON knowledge_base(tenant_id, doc_type);
CREATE INDEX idx_kb_embedding ON knowledge_base
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

Python一括エンベディング挿入

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

client = OpenAI()

@dataclass
class Document:
    title: str
    content: str
    source: str
    category: str

def generate_embeddings(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        all_embeddings.extend([d.embedding for d in response.data])
    return all_embeddings

def chunk_text(text: str, max_tokens: int = 500, overlap: int = 50) -> list[str]:
    words = text.split()
    chunks = []
    step = max_tokens - overlap
    for i in range(0, len(words), step):
        chunk = " ".join(words[i:i + max_tokens])
        chunks.append(chunk)
    return chunks

def insert_documents_with_chunks(
    conn_params: dict,
    docs: list[Document],
):
    conn = psycopg2.connect(**conn_params)
    cur = conn.cursor()

    for doc in docs:
        cur.execute(
            """INSERT INTO documents (tenant_id, title, content, source, category)
               VALUES (1, %s, %s, %s, %s) RETURNING id""",
            (doc.title, doc.content, doc.source, doc.category),
        )
        doc_id = cur.fetchone()[0]

        chunks = chunk_text(doc.content)
        texts = [f"{doc.title}\n{c}" for c in chunks]
        embeddings = generate_embeddings(texts)

        for idx, (chunk, emb) in enumerate(zip(chunks, embeddings)):
            cur.execute(
                """INSERT INTO document_chunks
                   (document_id, chunk_index, content, embedding)
                   VALUES (%s, %s, %s, %s)""",
                (doc_id, idx, chunk, str(emb)),
            )

    conn.commit()
    cur.close()
    conn.close()

insert_documents_with_chunks(
    conn_params={"dbname": "mydb", "user": "postgres", "password": "password"},
    docs=[
        Document(
            title="pgvectorパフォーマンスチューニングガイド",
            content="pgvectorはPostgreSQLのベクトル検索拡張機能です...",
            source="toolsku.dev/blog",
            category="データベース",
        ),
    ],
)

パターン2:HNSWインデックス最適化

HNSWパラメータ詳細

                    HNSWインデックス構造
            ┌──────────────────────────┐
            │     Layer 2 (希薄)       │
            │       ○──────○          │
            │      /        \         │
            ├─────/──────────\────────┤
            │   Layer 1 (中程度)      │
            │   ○──○──○──○──○         │
            │  / \  |   |  / \        │
            ├─/──\─│───│─/──\─────────┤
            │ Layer 0 (密)            │
            │ ○─○─○─○─○─○─○─○─○      │
            │  m=16 ノード最大16エッジ │
            └──────────────────────────┘

    ef_construction: 構築時検索幅(大きいほど精度向上、構築遅い)
    m: ノードあたりの最大接続数(大きいほどリコール向上、メモリ増加)
    ef_search: クエリ時検索幅(大きいほどリコール向上、クエリ遅い)
パラメータ デフォルト 推奨範囲 影響
m 16 8-64 接続度、大きいほどリコール向上・メモリ増加
ef_construction 64 32-200 構築品質、大きいほど良いインデックス・構築遅い
ef_search 40 20-200 クエリ精度、大きいほどリコール向上・クエリ遅い

シナリオ別インデックスパラメータ

-- 高リコール(法律/医療ドキュメント検索)
CREATE INDEX idx_docs_embedding_high_recall ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 48, ef_construction = 128);

-- バランス(汎用RAG)
CREATE INDEX idx_docs_embedding_balanced ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- 低レイテンシ(リアルタイムレコメンデーション)
CREATE INDEX idx_docs_embedding_low_latency ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 8, ef_construction = 32);

IVFFlatインデックス(大規模データ代替)

CREATE INDEX idx_docs_embedding_ivf ON documents
    USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
インデックス 構築時間(100万行) クエリレイテンシ リコール 増分対応 メモリ
HNSW m=16 ~15min ~2ms 98% はい 2-3x
HNSW m=48 ~45min ~5ms 99.5% はい 4-5x
IVFFlat lists=100 ~3min ~3ms 90% いいえ(REINDEX必要) 1-1.5x

クエリ時の動的精度調整

SET LOCAL hnsw.ef_search = 200;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 10;

SET LOCAL hnsw.ef_search = 20;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 5;

パターン3:ハイブリッド検索(ベクトル+全文+キーワード)

なぜハイブリッド検索が必要か

純粋なベクトル検索は意味的マッチングに優れていますが、正確なキーワードを見落とす可能性があります。純粋な全文検索はキーワードマッチングに優れていますが、意味を理解できません。ハイブリッド検索は両方の利点を組み合わせ、2026年のRAGシステムの標準となっています。

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  ベクトル検索  │  │  全文検索     │  │  キーワード検索│
│  (pgvector)  │  │  (tsvector)  │  │  (LIKE/ILIKE) │
│  意味マッチ   │  │  単語頻度     │  │  完全一致     │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                  │                  │
       ▼                  ▼                  ▼
┌──────────────────────────────────────────────────┐
│              RRF (Reciprocal Rank Fusion)         │
│  score = Σ 1/(k + rank_i)   k=60                 │
└──────────────────────┬───────────────────────────┘
                       │
                       ▼
              ┌──────────────────┐
              │  最終結果         │
              └──────────────────┘

3ウェイハイブリッド検索SQL

WITH vector_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS v_rank,
           1 - (embedding <=> $1::vector) AS v_score
    FROM knowledge_base
    WHERE 1 - (embedding <=> $1::vector) > 0.5
    ORDER BY embedding <=> $1::vector
    LIMIT 50
),
fulltext_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (
               ORDER BY ts_rank(tsv, plainto_tsquery('english', $2)) DESC
           ) AS ft_rank,
           ts_rank(tsv, plainto_tsquery('english', $2)) AS ft_score
    FROM knowledge_base
    WHERE tsv @@ plainto_tsquery('english', $2)
    LIMIT 50
),
keyword_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (
               ORDER BY
                   CASE WHEN title ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END,
                   CASE WHEN content ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END
           ) AS kw_rank
    FROM knowledge_base
    WHERE title ILIKE '%' || $2 || '%'
       OR content ILIKE '%' || $2 || '%'
    LIMIT 50
),
rrf_combined AS (
    SELECT
        COALESCE(v.id, f.id, k.id) AS id,
        COALESCE(v.title, f.title, k.title) AS title,
        COALESCE(v.content, f.content, k.content) AS content,
        COALESCE(1.0 / (60 + v.v_rank), 0) * 0.5 +
        COALESCE(1.0 / (60 + f.ft_rank), 0) * 0.3 +
        COALESCE(1.0 / (60 + k.kw_rank), 0) * 0.2 AS rrf_score
    FROM vector_results v
    FULL OUTER JOIN fulltext_results f ON v.id = f.id
    FULL OUTER JOIN keyword_results k ON v.id = k.id
)
SELECT id, title, content, rrf_score
FROM rrf_combined
ORDER BY rrf_score DESC
LIMIT 10;

Pythonハイブリッド検索ラッパー

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

@dataclass
class HybridSearchResult:
    id: int
    title: str
    content: str
    score: float

class PgVectorHybridSearch:
    def __init__(self, db_url: str, openai_api_key: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def hybrid_search(
        self,
        query: str,
        top_k: int = 10,
        vector_weight: float = 0.5,
        fulltext_weight: float = 0.3,
        keyword_weight: float = 0.2,
        category: str | None = None,
    ) -> list[HybridSearchResult]:
        query_embedding = str(self.embed(query))
        category_filter = "AND category = %s" if category else ""
        params = [query_embedding, query_embedding]
        if category:
            params.append(category)
        params.extend([query_embedding, query])
        if category:
            params.append(category)
        params.extend([query, query])
        if category:
            params.append(category)
        params.extend([query_embedding, query, top_k])

        sql = f"""
        WITH vector_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
            FROM knowledge_base
            WHERE 1 - (embedding <=> %s::vector) > 0.5
            {category_filter}
            ORDER BY embedding <=> %s::vector
            LIMIT 50
        ),
        fulltext_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (
                       ORDER BY ts_rank(tsv, plainto_tsquery('english', %s)) DESC
                   ) AS ft_rank
            FROM knowledge_base
            WHERE tsv @@ plainto_tsquery('english', %s)
            {category_filter}
            LIMIT 50
        ),
        keyword_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (
                       ORDER BY
                       CASE WHEN title ILIKE '%%' || %s || '%%' THEN 0 ELSE 1 END
                   ) AS kw_rank
            FROM knowledge_base
            WHERE title ILIKE '%%' || %s || '%%'
               OR content ILIKE '%%' || %s || '%%'
            {category_filter}
            LIMIT 50
        ),
        rrf_combined AS (
            SELECT
                COALESCE(v.id, f.id, k.id) AS id,
                COALESCE(v.title, f.title, k.title) AS title,
                COALESCE(v.content, f.content, k.content) AS content,
                COALESCE(1.0 / (60 + v.v_rank), 0) * {vector_weight} +
                COALESCE(1.0 / (60 + f.ft_rank), 0) * {fulltext_weight} +
                COALESCE(1.0 / (60 + k.kw_rank), 0) * {keyword_weight} AS rrf_score
            FROM vector_results v
            FULL OUTER JOIN fulltext_results f ON v.id = f.id
            FULL OUTER JOIN keyword_results k ON v.id = k.id
        )
        SELECT id, title, content, rrf_score
        FROM rrf_combined
        ORDER BY rrf_score DESC
        LIMIT %s
        """

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            HybridSearchResult(id=r[0], title=r[1], content=r[2], score=r[3])
            for r in results
        ]

    def close(self):
        self.conn.close()

パターン4:リランキングと結果融合

なぜリランキングが必要か

RRF融合後の結果はランキング位置に基づいており、意味的関連性に基づいていません。Cross-Encoderを使用したリランキングは候補をペアワイズで再スコアリングし、最終結果の品質を大幅に向上させます。

┌──────────────┐
│  ハイブリッド  │
│  Top-50候補   │
└──────┬───────┘
       │
       ▼
┌──────────────┐     ┌──────────────┐
│  Reranker     │────▶│  精密結果     │
│  Cross-Encoder│     │  Top-10      │
│  ペアワイズ   │     │  高品質       │
└──────────────┘     └──────────────┘

PostgreSQL内リランキング(ビジネスルールベース)

WITH hybrid_results AS (
    SELECT id, title, content, rrf_score
    FROM rrf_combined
    ORDER BY rrf_score DESC
    LIMIT 30
)
SELECT
    id, title, content,
    rrf_score AS base_score,
    rrf_score
        * CASE
            WHEN published_at > CURRENT_DATE - INTERVAL '30 days' THEN 1.2
            WHEN published_at > CURRENT_DATE - INTERVAL '90 days' THEN 1.1
            ELSE 1.0
        END
        * CASE
            WHEN doc_type = 'official' THEN 1.3
            WHEN doc_type = 'tutorial' THEN 1.1
            ELSE 1.0
        END AS final_score
FROM hybrid_results h
JOIN knowledge_base kb ON h.id = kb.id
ORDER BY final_score DESC
LIMIT 10;

Python Cross-Encoderリランキング

from sentence_transformers import CrossEncoder
import psycopg2
from openai import OpenAI

class RerankingRAG:
    def __init__(self, db_url: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI()
        self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search_with_rerank(self, query: str, top_k: int = 10, candidate_size: int = 30):
        query_embedding = str(self.embed(query))
        cur = self.conn.cursor()

        cur.execute(
            """SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
               FROM knowledge_base
               WHERE 1 - (embedding <=> %s::vector) > 0.5
               ORDER BY embedding <=> %s::vector
               LIMIT %s""",
            (query_embedding, query_embedding, query_embedding, candidate_size),
        )
        candidates = cur.fetchall()

        pairs = [(query, f"{row[1]}\n{row[2]}") for row in candidates]
        rerank_scores = self.reranker.predict(pairs)

        results = []
        for row, score in zip(candidates, rerank_scores):
            results.append({
                "id": row[0],
                "title": row[1],
                "content": row[2],
                "vector_score": float(row[3]),
                "rerank_score": float(score),
            })

        results.sort(key=lambda x: x["rerank_score"], reverse=True)
        return results[:top_k]

    def close(self):
        self.conn.close()

パターン5:マルチテナントデータ分離

行レベルセキュリティ(RLS)

ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON knowledge_base
    USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);

SET app.current_tenant_id = '42';

SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM knowledge_base
ORDER BY embedding <=> $1::vector
LIMIT 10;

パーティションベースの分離

CREATE TABLE knowledge_base_partitioned (
    LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY LIST (tenant_id);

CREATE TABLE kb_tenant_1 PARTITION OF knowledge_base_partitioned FOR VALUES IN (1);
CREATE TABLE kb_tenant_2 PARTITION OF knowledge_base_partitioned FOR VALUES IN (2);
CREATE TABLE kb_tenant_3 PARTITION OF knowledge_base_partitioned FOR VALUES IN (3);
CREATE TABLE kb_tenant_default PARTITION OF knowledge_base_partitioned DEFAULT;

CREATE INDEX idx_kb_t1_embedding ON kb_tenant_1
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);
CREATE INDEX idx_kb_t2_embedding ON kb_tenant_2
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

Pythonマルチテナント検索

import psycopg2
from openai import OpenAI
from contextlib import contextmanager

@contextmanager
def tenant_session(conn: psycopg2.extensions.connection, tenant_id: int):
    cur = conn.cursor()
    cur.execute("SET app.current_tenant_id = %s", (str(tenant_id),))
    try:
        yield conn
    finally:
        cur.execute("RESET app.current_tenant_id")

class MultiTenantRAG:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search(self, tenant_id: int, query: str, top_k: int = 10):
        conn = psycopg2.connect(self.db_url)
        query_embedding = str(self.embed(query))

        with tenant_session(conn, tenant_id):
            cur = conn.cursor()
            cur.execute(
                """SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
                   FROM knowledge_base
                   WHERE 1 - (embedding <=> %s::vector) > 0.6
                   ORDER BY embedding <=> %s::vector
                   LIMIT %s""",
                (query_embedding, query_embedding, query_embedding, top_k),
            )
            results = cur.fetchall()

        conn.close()
        return [
            {"id": r[0], "title": r[1], "content": r[2], "score": float(r[3])}
            for r in results
        ]

パターン6:パフォーマンスチューニングとクエリ最適化

主要PostgreSQLパラメータ

SET work_mem = '256MB';
SET maintenance_work_mem = '1GB';
SET max_parallel_workers_per_gather = 4;
SET hnsw.ef_search = 40;
SET ivfflat.probes = 10;

EXPLAIN ANALYZE分析

EXPLAIN ANALYZE
SELECT id, title, 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) > 0.7
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;

プリフィルター最適化

CREATE INDEX idx_kb_ai_embedding ON knowledge_base
    USING hnsw (embedding vector_cosine_ops)
    WHERE category = 'AI';

コネクションプール設定

# pgbouncer.ini
[databases]
mydb = host=127.0.0.1 port=5432 dbname=mydb

[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 50
reserve_pool_size = 10
reserve_pool_timeout = 3
server_idle_timeout = 300

バッチ書き込み最適化

import psycopg2
from openai import OpenAI
import io

client = OpenAI()

def bulk_insert_with_copy(
    conn_params: dict,
    records: list[dict],
):
    conn = psycopg2.connect(**conn_params)
    cur = conn.cursor()

    texts = [f"{r['title']}\n{r['content']}" for r in records]
    embeddings = []
    for i in range(0, len(texts), 100):
        batch = texts[i:i + 100]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        embeddings.extend([d.embedding for d in response.data])

    buffer = io.StringIO()
    for record, emb in zip(records, embeddings):
        line = "\t".join([
            str(record.get("tenant_id", 1)),
            record["title"].replace("\t", " ").replace("\n", " "),
            record["content"].replace("\t", " ").replace("\n", " "),
            str(emb),
        ])
        buffer.write(line + "\n")

    buffer.seek(0)
    cur.copy_from(buffer, "knowledge_base", columns=[
        "tenant_id", "title", "content", "embedding"
    ])
    conn.commit()
    cur.close()
    conn.close()

パターン7:本番デプロイとバックアップリカバリ

Docker Compose本番設定

services:
  postgres:
    image: pgvector/pgvector:pg16
    restart: always
    environment:
      POSTGRES_DB: rag_production
      POSTGRES_USER: rag_app
      POSTGRES_PASSWORD: ${PG_PASSWORD}
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8"
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    command: >
      postgres
        -c shared_buffers=4GB
        -c work_mem=256MB
        -c maintenance_work_mem=1GB
        -c max_parallel_workers_per_gather=4
        -c effective_cache_size=12GB
        -c random_page_cost=1.1
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U rag_app -d rag_production"]
      interval: 10s
      timeout: 5s
      retries: 5

  pgbouncer:
    image: edoburu/pgbouncer
    restart: always
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_USER: rag_app
      DB_PASSWORD: ${PG_PASSWORD}
      DB_NAME: rag_production
      POOL_MODE: transaction
      MAX_CLIENT_CONN: 1000
      DEFAULT_POOL_SIZE: 50
    ports:
      - "6432:5432"
    depends_on:
      postgres:
        condition: service_healthy

volumes:
  pgdata:
    driver: local

バックアップ戦略

#!/bin/bash
DB_NAME="rag_production"
DB_USER="rag_app"
BACKUP_DIR="/backups/pgvector"
DATE=$(date +%Y%m%d_%H%M%S)

mkdir -p $BACKUP_DIR

pg_dump -U $DB_USER -d $DB_NAME -F c -f "${BACKUP_DIR}/full_${DATE}.dump"
pg_dump -U $DB_USER -d $DB_NAME -t knowledge_base -F c -f "${BACKUP_DIR}/kb_${DATE}.dump"

find $BACKUP_DIR -name "*.dump" -mtime +7 -delete

echo "Backup completed: ${DATE}"

リカバリ

pg_restore -U rag_app -d rag_production -c "${BACKUP_DIR}/full_20260616_030000.dump"
pg_restore -U rag_app -d rag_production -t knowledge_base "${BACKUP_DIR}/kb_20260616_030000.dump"

監視メトリクス

SELECT pg_size_pretty(pg_relation_size('idx_kb_embedding')) AS index_size;
SELECT pg_size_pretty(pg_total_relation_size('knowledge_base')) AS total_size;

SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE indexname = 'idx_kb_embedding';

SELECT sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) AS cache_hit_ratio
FROM pg_statio_user_tables
WHERE relname = 'knowledge_base';

5つのよくある落とし穴と解決策

落とし穴1:HNSWインデックス構築OOM

現象:100万行のデータでHNSWインデックス構築時にメモリ不足。

解決策maintenance_work_memを増加、ef_constructionを低下、CONCURRENTLYを使用。

SET maintenance_work_mem = '2GB';
CREATE INDEX CONCURRENTLY idx_docs_embedding ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 32);

落とし穴2:ベクトル次元の不一致

現象ERROR: expected 1536 dimensions, not 768

解決策:Embeddingモデルの出力次元がテーブル定義と一致することを確認。

モデル 出力次元
text-embedding-3-small 1536
text-embedding-3-large 3072
text-embedding-ada-002 1536
bge-large-en-v1.5 1024
bge-small-en-v1.5 384
ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE vector(768);

落とし穴3:IVFFlatインデックスのデータ更新後リコール急低下

現象:大量のINSERT後に検索結果が不正確に。

解決策:定期的にREINDEXするか、HNSWに切り替え。

REINDEX INDEX CONCURRENTLY idx_docs_embedding_ivf;

落とし穴4:ハイブリッド検索のパフォーマンス低下

現象:3ウェイハイブリッド検索に2秒以上かかる。

解決策:各検索の候補セットサイズを制限(各50件)、インデックスが使用されていることを確認。

落とし穴5:マルチテナントデータ漏洩

現象:テナントAのクエリがテナントBのデータを返す。

解決策:RLSを有効化、すべてのクエリにtenant_idフィルターを含める。

ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON knowledge_base
    USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);

10のよくあるエラートラブルシューティング

# エラー 原因 解決策
1 extension "vector" not available pgvector未インストール pgvector/pgvector Dockerイメージを使用またはソースからコンパイル
2 expected 1536 dimensions, not 768 ベクトル次元の不一致 Embeddingモデルを確認、テーブル定義vector(N)を変更
3 operator does not exist: vector <=> unknown 型キャストの欠落 $1::vector明示的キャストを使用
4 index row size exceeds maximum 2712 HNSW行が大きすぎる mパラメータを低下、またはhalfvec(1536)を使用
5 cannot create index concurrently pgvectorバージョンが古い 0.7+にアップグレード
6 out of memory during index build インデックス構築メモリ不足 maintenance_work_memを増加、ef_constructionを低下
7 invalid input syntax for type vector ベクトル形式エラー [0.1, 0.2, ...]形式を使用
8 permission denied for schema public CREATE EXTENSION権限なし スーパーユーザーでCREATE EXTENSIONを実行
9 IVFFlat recall drops after inserts 新データがクラスタ中心から逸脱 定期的にREINDEXまたはHNSWに切り替え
10 sequential scan on vector column インデックスが使用されていない 次元の一致、型キャスト、データ量を確認

高度な最適化テクニック

1. 半精度ベクトルで50%保存

ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE halfvec(1536);

CREATE INDEX idx_kb_embedding_half ON knowledge_base
    USING hnsw (embedding halfvec_cosine_ops)
    WITH (m = 16, ef_construction = 64);

2. スパースベクトル

CREATE TABLE sparse_docs (
    id BIGSERIAL PRIMARY KEY,
    content TEXT,
    embedding sparsevec(30000)
);

INSERT INTO sparse_docs (content, embedding)
VALUES ('example', '{1:0.1, 5:0.3, 100:0.7}/30000');

3. 時間ベースパーティショニング(ホット/コールド分離)

CREATE TABLE knowledge_base_partitioned (
    LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY RANGE (created_at);

CREATE TABLE kb_2026_q1 PARTITION OF knowledge_base_partitioned
    FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');
CREATE TABLE kb_2026_q2 PARTITION OF knowledge_base_partitioned
    FOR VALUES FROM ('2026-04-01') TO ('2026-07-01');
CREATE TABLE kb_archive PARTITION OF knowledge_base_partitioned
    DEFAULT;

ALTER TABLE kb_archive SET TABLESPACE slow_disk;

4. 非同期エンベディング更新

import psycopg2
import redis
import json
from openai import OpenAI

r = redis.Redis()
client = OpenAI()

def enqueue_embedding_update(doc_id: int, text: str):
    r.lpush("embedding_queue", json.dumps({"doc_id": doc_id, "text": text}))

def process_embedding_queue():
    conn = psycopg2.connect("dbname=mydb user=postgres password=password")
    cur = conn.cursor()

    while True:
        _, data = r.brpop("embedding_queue", timeout=30)
        if data is None:
            continue
        task = json.loads(data)
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=task["text"],
        )
        embedding = response.data[0].embedding

        cur.execute(
            "UPDATE knowledge_base SET embedding = %s WHERE id = %s",
            (str(embedding), task["doc_id"]),
        )
        conn.commit()

    cur.close()
    conn.close()

5. クエリキャッシュ

import hashlib
import json
import redis

r = redis.Redis()

def cached_vector_search(query: str, search_fn, ttl: int = 3600):
    cache_key = f"vec_search:{hashlib.md5(query.encode()).hexdigest()}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    results = search_fn(query)
    r.setex(cache_key, ttl, json.dumps(results))
    return results

比較分析:pgvector vs Pinecone vs Milvus vs Qdrant

項目 pgvector Pinecone Milvus Qdrant
デプロイ方式 PG拡張 フルマネージド セルフ/マネージド セルフ/マネージド
運用コスト 極小(PG再利用) 高(クエリ課金)
ハイブリッドクエリ ネイティブSQL 限定的フィルタ 限定的フィルタ フィルタ
トランザクション ACID なし なし なし
最大次元 16000 20000 32768 65535
インデックスタイプ HNSW/IVFFlat 独自 HNSW/IVF/DiskANN HNSW
水平スケール Citus必要 自動 自動 自動
対応規模 百万規模 十億規模 十億規模 億規模
データ整合性 強整合 結果整合 結果整合 結果整合
半精度サポート halfvec あり あり あり
マルチテナント RLS/パーティション ネームスペース コレクション コレクション
学習曲線 低(SQL) 低(API)
コスト(100万ベクトル) $0(既存PG) ~$70/月 ~$50/月(セルフ) ~$40/月(セルフ)

選択ガイド

  • 既存PostgreSQL + 百万規模データ → pgvector(追加コストなし) | フルマネージド需要 + コストOK → Pinecone
  • 十億規模 + 水平スケーリング → Milvus
  • 中規模 + 柔軟なフィルタリング → Qdrant

オンラインツールおすすめ


関連記事


まとめ

PostgreSQL pgvector RAGは2026年、中小規模のRAGシステムにおける最適解となっています。7つのプロダクションパターンは、テーブル設計、インデックス最適化、ハイブリッド検索、リランキング、マルチテナント分離から本番デプロイまでの完全なパイプラインをカバーしています。コアアドバンテージ:追加の運用コストなし、ネイティブSQLハイブリッドクエリ、ACIDトランザクション保証。百万規模のデータでHNSWインデックスは優れたパフォーマンスを発揮し、3ウェイハイブリッド検索(ベクトル+全文+キーワード)+ RRF融合 + Cross-Encoderリランキングは、純粋なベクトル検索をはるかに超えるリコールを提供します。データが億規模に達するか、水平スケーリングが必要な場合にのみ、専用ベクトルデータベースの検討が必要です。


参考リソース

ブラウザローカルツールを無料で試す →

#PostgreSQL#pgvector#RAG#向量检索#语义搜索#混合检索#2026#数据库