PostgreSQL + pgvectorベクトル検索:2026年にベクトルデータベースなしでRAG検索を実現

编程语言

PostgreSQL + pgvectorベクトル検索:2026年にベクトルデータベースなしでRAG検索を実現

RAGと聞いてPinecone、Milvus、Weaviate这样的なベクトルデータベースをすぐに思い浮かべませんか?単純なセマンティック検索機能のために全新的なデータベースシステムを導入し、運用コストが倍増し、データ同期に頭を悩ませていませんか?2026年、PostgreSQLのpgvector拡張は十分に成熟しています——既存のPostgreSQLでベクトル検索ができ、追加のベクトルデータベースは不要です。


背景知識

ベクトル検索の基礎

ベクトル検索の核心は、テキストや画像などの非構造化データを高次元ベクトル(Embedding)に変換し、ベクトル間の距離(コサイン類似度、ユークリッド距離など)を計算して意味的に最も近い結果を見つけることです。

概念 説明
Embedding データをベクトルにマッピング text-embedding-3-largeは3072次元ベクトルを出力
コサイン類似度 ベクトルの夾角のコサイン値、範囲[-1,1] 1は完全に同一
ユークリッド距離 ベクトル間の直線距離 0は完全に同一
内積 ベクトルのドット積 正規化ベクトルに適用
ANN検索 近似最近傍、精度を犠牲に速度を向上 HNSW、IVFFlat

pgvectorがサポートするインデックスタイプ

インデックス アルゴリズム 構築速度 クエリ速度 リコール メモリ使用量 適用シーン
HNSW グラフインデックス 遅い 速い 高い 高い 中小規模、高リコール
IVFFlat 転置インデックス 速い 低い 大規模、精度低下許容

問題分析

なぜ専用ベクトルデータベースを使わないのか?

  1. 運用コスト:新しいデータベースの追加は、バックアップ、監視、アップグレード、高可用性のすべてを追加で処理する必要がある
  2. データ整合性:業務データはPostgreSQL、ベクトルデータはベクトルデータベースにあり、双書き込みの整合性を保証するのが困難
  3. クエリ能力:ベクトルデータベースのフィルタリング能力は限られており、複雑な結合クエリができない
  4. 移行リスク:ベクトルデータベース市場は断片化しており、Pinecone/Milvus/Weaviate APIに互換性がない

pgvectorを使えば、PostgreSQL内でベクトル検索+業務フィルタリング+結合クエリを完了でき、ワンストップで解決できます。


ステップバイステップ実践

ステップ1:pgvector拡張のインストール

-- PostgreSQLに接続
psql -U postgres -d mydb

-- 拡張をインストール
CREATE EXTENSION IF NOT EXISTS vector;

-- バージョンを確認
SELECT extversion FROM pg_extension WHERE extname = 'vector';
-- 0.8.0+

Docker方式:

# docker-compose.yml
services:
  postgres:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_DB: mydb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

volumes:
  pgdata:

ステップ2:ベクトルテーブルの作成

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    source TEXT,
    category TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    embedding vector(1536)
);

-- HNSWインデックスの作成(高リコールシーンに推奨)
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- IVFFlatインデックスの作成(大規模データに推奨)
-- CREATE INDEX ON documents
-- USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);

ステップ3:ベクトルの生成と保存

import psycopg2
from openai import OpenAI

client = OpenAI()
conn = psycopg2.connect("dbname=mydb user=postgres password=password")
cur = conn.cursor()

def generate_embedding(text: str) -> list[float]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return response.data[0].embedding

def insert_document(title: str, content: str, source: str, category: str):
    full_text = f"{title}\n{content}"
    embedding = generate_embedding(full_text)

    cur.execute(
        "INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
        (title, content, source, category, str(embedding)),
    )
    conn.commit()

insert_document(
    "K8s Gateway APIガイド",
    "Gateway APIはKubernetesの次世代トラフィック管理標準...",
    "toolsku.dev/blog",
    "DevOps",
)

ステップ4:ベクトル検索クエリ

-- コサイン類似度検索
SELECT id, title, content, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;

-- フィルタ条件付きベクトル検索
SELECT id, title, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE category = 'DevOps'
  AND 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;

ステップ5:ハイブリッドクエリ(ベクトル+全文)

-- 全文検索インデックスの作成
ALTER TABLE documents ADD COLUMN tsv tsvector
    GENERATED ALWAYS AS (to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(content, ''))) STORED;

CREATE INDEX ON documents USING gin(tsv);

-- ハイブリッドクエリ:ベクトル検索 + 全文検索 + RRF融合
WITH vector_results AS (
    SELECT id, title,
           ROW_NUMBER() OVER (ORDER BY embedding <=> $1) AS vector_rank,
           1 - (embedding <=> $1) AS vector_score
    FROM documents
    ORDER BY embedding <=> $1
    LIMIT 50
),
text_results AS (
    SELECT id, title,
           ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', $2)) DESC) AS text_rank,
           ts_rank(tsv, plainto_tsquery('simple', $2)) AS text_score
    FROM documents
    WHERE tsv @@ plainto_tsquery('simple', $2)
    LIMIT 50
),
combined AS (
    SELECT
        COALESCE(v.id, t.id) AS id,
        COALESCE(v.title, t.title) AS title,
        COALESCE(1.0 / (60 + v.vector_rank), 0) +
        COALESCE(1.0 / (60 + t.text_rank), 0) AS rrf_score
    FROM vector_results v
    FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT id, title, rrf_score
FROM combined
ORDER BY rrf_score DESC
LIMIT 10;

完全コード:RAG検索システム

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

@dataclass
class SearchResult:
    id: int
    title: str
    content: str
    score: float
    source: str

class PgVectorRAG:
    def __init__(self, db_url: str, openai_api_key: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search(
        self,
        query: str,
        top_k: int = 10,
        threshold: float = 0.6,
        category: str | None = None,
        use_hybrid: bool = True,
    ) -> list[SearchResult]:
        query_embedding = self.embed(query)
        embedding_str = str(query_embedding)

        if use_hybrid:
            return self._hybrid_search(embedding_str, query, top_k, category)
        return self._vector_search(embedding_str, top_k, threshold, category)

    def _vector_search(
        self,
        embedding_str: str,
        top_k: int,
        threshold: float,
        category: str | None,
    ) -> list[SearchResult]:
        sql = """
            SELECT id, title, content, source,
                   1 - (embedding <=> %s::vector) AS similarity
            FROM documents
            WHERE 1 - (embedding <=> %s::vector) > %s
        """
        params: list = [embedding_str, embedding_str, threshold]

        if category:
            sql += " AND category = %s"
            params.append(category)

        sql += " ORDER BY embedding <=> %s::vector LIMIT %s"
        params.extend([embedding_str, top_k])

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
            for r in results
        ]

    def _hybrid_search(
        self,
        embedding_str: str,
        query_text: str,
        top_k: int,
        category: str | None,
    ) -> list[SearchResult]:
        category_filter = "AND category = %s" if category else ""
        params: list = [embedding_str, embedding_str]

        if category:
            params.append(category)

        params.extend([embedding_str, query_text])

        if category:
            params.append(category)

        params.extend([embedding_str, top_k])

        sql = f"""
        WITH vector_results AS (
            SELECT id, title, content, source,
                   ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
            FROM documents
            WHERE 1 - (embedding <=> %s::vector) > 0.5
            {category_filter}
            ORDER BY embedding <=> %s::vector
            LIMIT 50
        ),
        text_results AS (
            SELECT id, title, content, source,
                   ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', %s)) DESC) AS t_rank
            FROM documents
            WHERE tsv @@ plainto_tsquery('simple', %s)
            {category_filter}
            LIMIT 50
        ),
        combined AS (
            SELECT
                COALESCE(v.id, t.id) AS id,
                COALESCE(v.title, t.title) AS title,
                COALESCE(v.content, t.content) AS content,
                COALESCE(v.source, t.source) AS source,
                COALESCE(1.0 / (60 + v.v_rank), 0) +
                COALESCE(1.0 / (60 + t.t_rank), 0) AS rrf_score
            FROM vector_results v
            FULL OUTER JOIN text_results t ON v.id = t.id
        )
        SELECT id, title, content, source, rrf_score
        FROM combined
        ORDER BY rrf_score DESC
        LIMIT %s
        """

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
            for r in results
        ]

    def generate_answer(self, query: str, top_k: int = 5) -> str:
        results = self.search(query, top_k=top_k, use_hybrid=True)
        context = "\n\n".join(f"[{r.title}]\n{r.content}" for r in results)

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "以下の参考資料に基づいて質問に答えてください。参考資料に関連情報がない場合はその旨を述べてください。\n\n" + context},
                {"role": "user", "content": query},
            ],
        )
        return response.choices[0].message.content


if __name__ == "__main__":
    rag = PgVectorRAG(
        db_url="dbname=mydb user=postgres password=password",
        openai_api_key="sk-xxx",
    )

    answer = rag.generate_answer("K8s Gateway APIの設定方法は?")
    print(answer)

よくある落とし穴

落とし穴1:HNSWインデックスの構築時間が長すぎる

現象:百万件のデータでHNSWインデックスを作成するのに数時間かかり、その間テーブルがロックされる。

解決策CONCURRENTLYオプション(pgvector 0.7+)を使用するか、オフピーク時に構築する。ef_constructionパラメータを下げる(デフォルト64→32)と構築は速くなるが、リコール率がわずかに低下する。バッチ構築後にマージする。

落とし穴2:ベクトル次元の不一致による挿入失敗

現象ERROR: expected 1536 dimensions, not 768

解決策:Embeddingモデルの出力次元がテーブル定義のvector(N)と一致していることを確認する。text-embedding-3-smallは1536次元、text-embedding-3-largeは3072次元を出力する。テーブル定義を変更:ALTER TABLE documents ALTER COLUMN embedding TYPE vector(768)

落とし穴3:IVFFlatインデックスのデータ更新後のリコール率急降下

現象:大量のデータを追加した後、検索結果が著しく不正確になる。

解決策:IVFFlatのクラスタ中心はインデックス作成時に決定されるため、新規データによりズレが生じる。定期的にインデックスを再構築する:REINDEX INDEX documents_embedding_idx。またはHNSWインデックスを使用する(インクリメンタル対応)。

落とし穴4:ハイブリッドクエリのパフォーマンスが悪い

現象:ベクトル+全文ハイブリッドクエリに1秒以上かかる。

解決策:両方の検索がそれぞれインデックスを使用していることをEXPLAIN ANALYZEで確認する。各候補セットのサイズを制限(各50件)、RRF融合は少量データで高速。ハイブリッドクエリでのSELECT *の使用を避ける。

落とし穴5:pgvectorのメモリ使用量が大きすぎる

現象:HNSWインデックスのメモリ使用量が生データの2〜3倍。

解決策mパラメータを調整(デフォルト16→8)してグラフの接続度を下げ、メモリ使用量を削減するがリコール率は若干犠牲になる。IVFFlatをHNSWの代わりに使用する。半精度ベクトルの使用を検討:vector(1536)halfvec(1536)(pgvector 0.7+)。


エラートラブルシューティング

番号 エラーメッセージ 原因 解決方法
1 extension "vector" not available pgvectorがインストールされていない pgvector/pgvector Dockerイメージを使用するかソースからコンパイル
2 expected 1536 dimensions, not 768 ベクトル次元の不一致 Embeddingモデルを確認し、テーブルの次元定義を変更
3 operator does not exist: vector <=> unknown 型キャストが不足 %s::vectorで明示的キャストを使用
4 index row size exceeds maximum HNSWインデックス行が大きすぎる mパラメータを下げる、またはhalfvecを使用
5 cannot create index concurrently pgvectorバージョンがサポートしていない 0.7+にアップグレードするか非並行方式を使用
6 IVFFlat recall is low after inserts 新規データがクラスタ中心から逸脱 定期的にREINDEXするかHNSWに切り替え
7 out of memory during index build インデックス構築のメモリ不足 work_memを増やす、バッチ構築
8 invalid input syntax for type vector ベクトル形式エラー [0.1, 0.2, ...]形式を使用
9 permission denied for schema public ユーザーに拡張作成権限がない スーパーユーザーでCREATE EXTENSIONを実行
10 query timeout on hybrid search ハイブリッドクエリが遅すぎる 候補セットサイズを制限、インデックスを追加、EXPLAINを確認

高度な最適化

1. 半精度ベクトルでストレージを節約

-- halfvec(半精度浮動小数点)を使用、ストレージ半減
ALTER TABLE documents ALTER COLUMN embedding TYPE halfvec(1536);

-- HNSWインデックスの作成
CREATE INDEX ON documents
USING hnsw (embedding halfvec_cosine_ops)
WITH (m = 16, ef_construction = 64);

2. パーティションテーブルで大規模検索を高速化

CREATE TABLE documents_partitioned (
    LIKE documents INCLUDING DEFAULTS
) PARTITION BY LIST (category);

CREATE TABLE documents_devops PARTITION OF documents_partitioned FOR VALUES IN ('DevOps');
CREATE TABLE documents_frontend PARTITION OF documents_partitioned FOR VALUES IN ('フロントエンド工学');
CREATE TABLE documents_ai PARTITION OF documents_partitioned FOR VALUES IN ('AIとビッグデータ');

3. クエリ時のHNSW検索精度の調整

-- ef_searchを上げてリコール率を向上(デフォルト40)
SET hnsw.ef_search = 200;

-- ef_searchを下げてクエリを高速化
SET hnsw.ef_search = 20;

4. バッチベクトル生成の最適化

import psycopg2
from openai import OpenAI
import math

client = OpenAI()

def batch_embed(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        embeddings.extend([d.embedding for d in response.data])
    return embeddings

def bulk_insert_documents(docs: list[dict]):
    texts = [f"{d['title']}\n{d['content']}" for d in docs]
    embeddings = batch_embed(texts)

    cur = conn.cursor()
    for doc, emb in zip(docs, embeddings):
        cur.execute(
            "INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
            (doc['title'], doc['content'], doc['source'], doc['category'], str(emb)),
        )
    conn.commit()

比較分析

項目 pgvector Pinecone Milvus Weaviate Qdrant
デプロイ方式 PostgreSQL拡張 フルマネージド セルフホスト/マネージド セルフホスト/マネージド セルフホスト/マネージド
運用コスト 極低(PGを再利用) 高(クエリ課金)
ハイブリッドクエリ ネイティブSQL 限定フィルタ 限定フィルタ GraphQLフィルタ フィルタ
トランザクション支援 ACID なし なし なし なし
最大次元 16000 20000 32768 65535 65535
インデックスタイプ HNSW/IVFFlat 独自 HNSW/IVF/DiskANN HNSW HNSW
水平スケーリング Citusが必要 自動 自動 自動 自動
適用規模 百万件レベル 十億件レベル 十億件レベル 億件レベル 億件レベル
データ整合性 強整合性 結局整合性 結局整合性 結局整合性 結局整合性

まとめと展望

まとめ:ほとんどのRAGアプリケーションのシーンでは、PostgreSQL + pgvectorで完全に十分です。その核心的な利点は、追加の運用コストがゼロ、ネイティブSQLハイブリッドクエリ、ACIDトランザクション保証です。百万件レベルのデータ量ではHNSWインデックスのパフォーマンスは優秀で、ハイブリッドクエリ(ベクトル+全文+RRF)のリコール率は純粋なベクトル検索を大幅に上回ります。データ量が億件レベルに達するか、水平スケーリングが必要な場合にのみ、専用のベクトルデータベースを検討する必要があります。pgvectorから始め、必要に応じて進化させることをお勧めします。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#PostgreSQL#pgvector#向量搜索#语义检索#RAG#HNSW#IVFFlat#混合查询