PostgreSQL pgvector RAG実践:エンベディング保存からハイブリッド検索までの7つのプロダクションパターン
チームがRAGの導入を決めた途端、アーキテクトがPineconeの請求書を提示——月額$2000から。さらに厄介なのは、ビジネスデータはPostgreSQLに、ベクトルデータはPineconeにあり、デュアルライトの整合性を誰が保証するのかという問題。2026年、pgvectorは「使える」から「優秀」へ進化:HNSWインデックスのパフォーマンス向上、halfvec半精度保存、ネイティブハイブリッド検索……既存のPostgreSQLこそが最良のベクトルデータベースです。
主要な収穫
- pgvector RAGの7つのプロダクションパターンを習得、テーブル設計からハイブリッド検索まで完全網羅
- HNSWとIVFFlatインデックスの選択戦略とパラメータチューニングを理解
- ベクトル+全文+キーワードの3ウェイハイブリッド検索とRRFリランキングを実装
- マルチテナント分離、パフォーマンスチューニング、バックアップリカバリなどの本番課題を解決
- そのまま使える完全なPython RAGコードを取得
目次
- pgvectorコア概念
- パターン1:エンベディング保存とテーブル設計
- パターン2:HNSWインデックス最適化
- パターン3:ハイブリッド検索(ベクトル+全文+キーワード)
- パターン4:リランキングと結果融合
- パターン5:マルチテナントデータ分離
- パターン6:パフォーマンスチューニングとクエリ最適化
- パターン7:本番デプロイとバックアップリカバリ
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析:pgvector vs Pinecone vs Milvus vs Qdrant
- オンラインツールおすすめ
pgvectorコア概念
RAGとベクトル検索の関係
RAG(Retrieval-Augmented Generation)のコアフロー:ユーザーが質問 → ベクトル検索で関連ドキュメントを発見 → ドキュメントをコンテキストとしてLLMに提供 → 回答を生成。ベクトル検索はRAGの基盤であり、pgvectorがPostgreSQLにこの能力をもたらします。
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ User Query │────▶│ Embedding │────▶│ pgvector │
│ ユーザー質問 │ │ Model │ │ 類似度検索 │
└─────────────┘ └──────────────┘ └──────┬───────┘
│
▼
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ LLM Answer │◀────│ Context + │◀────│ Top-K Docs │
│ 回答生成 │ │ Prompt │ │ 関連ドキュメント│
└─────────────┘ └──────────────┘ └──────────────┘
pgvector距離メトリクス
| メトリクス | 演算子 | インデックス演算子クラス | ユースケース | 値域 |
|---|---|---|---|---|
| コサイン距離 | <=> |
vector_cosine_ops |
意味的類似度(最も一般的) | [0, 2] |
| ユークリッド距離 | <-> |
vector_l2_ops |
空間距離 | [0, +∞) |
| 内積 | <#> |
vector_ip_ops |
正規化ベクトル | (-∞, +∞) |
pgvectorバージョン履歴
| バージョン | リリース | 主な機能 |
|---|---|---|
| 0.5 | 2023 | HNSWインデックス、半精度ベクトル |
| 0.6 | 2024 | 並列インデックス構築、スパースベクトル |
| 0.7 | 2024 | CONCURRENTLYサポート、halfvecインデックス |
| 0.8 | 2025 | SPARSEVEC型、量子化サポート |
| 0.9 | 2026 | HNSW構築速度改善、マルチコア並列 |
パターン1:エンベディング保存とテーブル設計
基本テーブル構造
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
tenant_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT,
category TEXT,
tags TEXT[],
chunk_index INTEGER DEFAULT 0,
token_count INTEGER,
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_category ON documents(category);
CREATE INDEX idx_documents_created ON documents(created_at DESC);
チャンク保存設計
RAGシステムでは、長いドキュメントをチャンクに分割し、それぞれにエンベディングを生成します:
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
token_count INTEGER,
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_embedding ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
メタデータ+ベクトル統合テーブル
CREATE TABLE knowledge_base (
id BIGSERIAL PRIMARY KEY,
tenant_id INTEGER NOT NULL,
doc_type VARCHAR(50),
title TEXT NOT NULL,
content TEXT NOT NULL,
summary TEXT,
keywords TEXT[],
published_at DATE,
access_level INTEGER DEFAULT 0,
embedding vector(1536),
tsv tsvector GENERATED ALWAYS AS (
to_tsvector('english',
coalesce(title, '') || ' ' ||
coalesce(content, '') || ' ' ||
coalesce(array_to_string(keywords, ' '), '')
)
) STORED,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_kb_tsv ON knowledge_base USING gin(tsv);
CREATE INDEX idx_kb_tenant_type ON knowledge_base(tenant_id, doc_type);
CREATE INDEX idx_kb_embedding ON knowledge_base
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Python一括エンベディング挿入
import psycopg2
from openai import OpenAI
from dataclasses import dataclass
client = OpenAI()
@dataclass
class Document:
title: str
content: str
source: str
category: str
def generate_embeddings(texts: list[str], batch_size: int = 100) -> list[list[float]]:
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = client.embeddings.create(
model="text-embedding-3-small",
input=batch,
)
all_embeddings.extend([d.embedding for d in response.data])
return all_embeddings
def chunk_text(text: str, max_tokens: int = 500, overlap: int = 50) -> list[str]:
words = text.split()
chunks = []
step = max_tokens - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i:i + max_tokens])
chunks.append(chunk)
return chunks
def insert_documents_with_chunks(
conn_params: dict,
docs: list[Document],
):
conn = psycopg2.connect(**conn_params)
cur = conn.cursor()
for doc in docs:
cur.execute(
"""INSERT INTO documents (tenant_id, title, content, source, category)
VALUES (1, %s, %s, %s, %s) RETURNING id""",
(doc.title, doc.content, doc.source, doc.category),
)
doc_id = cur.fetchone()[0]
chunks = chunk_text(doc.content)
texts = [f"{doc.title}\n{c}" for c in chunks]
embeddings = generate_embeddings(texts)
for idx, (chunk, emb) in enumerate(zip(chunks, embeddings)):
cur.execute(
"""INSERT INTO document_chunks
(document_id, chunk_index, content, embedding)
VALUES (%s, %s, %s, %s)""",
(doc_id, idx, chunk, str(emb)),
)
conn.commit()
cur.close()
conn.close()
insert_documents_with_chunks(
conn_params={"dbname": "mydb", "user": "postgres", "password": "password"},
docs=[
Document(
title="pgvectorパフォーマンスチューニングガイド",
content="pgvectorはPostgreSQLのベクトル検索拡張機能です...",
source="toolsku.dev/blog",
category="データベース",
),
],
)
パターン2:HNSWインデックス最適化
HNSWパラメータ詳細
HNSWインデックス構造
┌──────────────────────────┐
│ Layer 2 (希薄) │
│ ○──────○ │
│ / \ │
├─────/──────────\────────┤
│ Layer 1 (中程度) │
│ ○──○──○──○──○ │
│ / \ | | / \ │
├─/──\─│───│─/──\─────────┤
│ Layer 0 (密) │
│ ○─○─○─○─○─○─○─○─○ │
│ m=16 ノード最大16エッジ │
└──────────────────────────┘
ef_construction: 構築時検索幅(大きいほど精度向上、構築遅い)
m: ノードあたりの最大接続数(大きいほどリコール向上、メモリ増加)
ef_search: クエリ時検索幅(大きいほどリコール向上、クエリ遅い)
| パラメータ | デフォルト | 推奨範囲 | 影響 |
|---|---|---|---|
m |
16 | 8-64 | 接続度、大きいほどリコール向上・メモリ増加 |
ef_construction |
64 | 32-200 | 構築品質、大きいほど良いインデックス・構築遅い |
ef_search |
40 | 20-200 | クエリ精度、大きいほどリコール向上・クエリ遅い |
シナリオ別インデックスパラメータ
-- 高リコール(法律/医療ドキュメント検索)
CREATE INDEX idx_docs_embedding_high_recall ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 48, ef_construction = 128);
-- バランス(汎用RAG)
CREATE INDEX idx_docs_embedding_balanced ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 低レイテンシ(リアルタイムレコメンデーション)
CREATE INDEX idx_docs_embedding_low_latency ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 8, ef_construction = 32);
IVFFlatインデックス(大規模データ代替)
CREATE INDEX idx_docs_embedding_ivf ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
| インデックス | 構築時間(100万行) | クエリレイテンシ | リコール | 増分対応 | メモリ |
|---|---|---|---|---|---|
| HNSW m=16 | ~15min | ~2ms | 98% | はい | 2-3x |
| HNSW m=48 | ~45min | ~5ms | 99.5% | はい | 4-5x |
| IVFFlat lists=100 | ~3min | ~3ms | 90% | いいえ(REINDEX必要) | 1-1.5x |
クエリ時の動的精度調整
SET LOCAL hnsw.ef_search = 200;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 10;
SET LOCAL hnsw.ef_search = 20;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 5;
パターン3:ハイブリッド検索(ベクトル+全文+キーワード)
なぜハイブリッド検索が必要か
純粋なベクトル検索は意味的マッチングに優れていますが、正確なキーワードを見落とす可能性があります。純粋な全文検索はキーワードマッチングに優れていますが、意味を理解できません。ハイブリッド検索は両方の利点を組み合わせ、2026年のRAGシステムの標準となっています。
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ ベクトル検索 │ │ 全文検索 │ │ キーワード検索│
│ (pgvector) │ │ (tsvector) │ │ (LIKE/ILIKE) │
│ 意味マッチ │ │ 単語頻度 │ │ 完全一致 │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ RRF (Reciprocal Rank Fusion) │
│ score = Σ 1/(k + rank_i) k=60 │
└──────────────────────┬───────────────────────────┘
│
▼
┌──────────────────┐
│ 最終結果 │
└──────────────────┘
3ウェイハイブリッド検索SQL
WITH vector_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS v_rank,
1 - (embedding <=> $1::vector) AS v_score
FROM knowledge_base
WHERE 1 - (embedding <=> $1::vector) > 0.5
ORDER BY embedding <=> $1::vector
LIMIT 50
),
fulltext_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (
ORDER BY ts_rank(tsv, plainto_tsquery('english', $2)) DESC
) AS ft_rank,
ts_rank(tsv, plainto_tsquery('english', $2)) AS ft_score
FROM knowledge_base
WHERE tsv @@ plainto_tsquery('english', $2)
LIMIT 50
),
keyword_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN title ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END,
CASE WHEN content ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END
) AS kw_rank
FROM knowledge_base
WHERE title ILIKE '%' || $2 || '%'
OR content ILIKE '%' || $2 || '%'
LIMIT 50
),
rrf_combined AS (
SELECT
COALESCE(v.id, f.id, k.id) AS id,
COALESCE(v.title, f.title, k.title) AS title,
COALESCE(v.content, f.content, k.content) AS content,
COALESCE(1.0 / (60 + v.v_rank), 0) * 0.5 +
COALESCE(1.0 / (60 + f.ft_rank), 0) * 0.3 +
COALESCE(1.0 / (60 + k.kw_rank), 0) * 0.2 AS rrf_score
FROM vector_results v
FULL OUTER JOIN fulltext_results f ON v.id = f.id
FULL OUTER JOIN keyword_results k ON v.id = k.id
)
SELECT id, title, content, rrf_score
FROM rrf_combined
ORDER BY rrf_score DESC
LIMIT 10;
Pythonハイブリッド検索ラッパー
import psycopg2
from openai import OpenAI
from dataclasses import dataclass
@dataclass
class HybridSearchResult:
id: int
title: str
content: str
score: float
class PgVectorHybridSearch:
def __init__(self, db_url: str, openai_api_key: str):
self.conn = psycopg2.connect(db_url)
self.client = OpenAI(api_key=openai_api_key)
def embed(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def hybrid_search(
self,
query: str,
top_k: int = 10,
vector_weight: float = 0.5,
fulltext_weight: float = 0.3,
keyword_weight: float = 0.2,
category: str | None = None,
) -> list[HybridSearchResult]:
query_embedding = str(self.embed(query))
category_filter = "AND category = %s" if category else ""
params = [query_embedding, query_embedding]
if category:
params.append(category)
params.extend([query_embedding, query])
if category:
params.append(category)
params.extend([query, query])
if category:
params.append(category)
params.extend([query_embedding, query, top_k])
sql = f"""
WITH vector_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
FROM knowledge_base
WHERE 1 - (embedding <=> %s::vector) > 0.5
{category_filter}
ORDER BY embedding <=> %s::vector
LIMIT 50
),
fulltext_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (
ORDER BY ts_rank(tsv, plainto_tsquery('english', %s)) DESC
) AS ft_rank
FROM knowledge_base
WHERE tsv @@ plainto_tsquery('english', %s)
{category_filter}
LIMIT 50
),
keyword_results AS (
SELECT id, title, content,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN title ILIKE '%%' || %s || '%%' THEN 0 ELSE 1 END
) AS kw_rank
FROM knowledge_base
WHERE title ILIKE '%%' || %s || '%%'
OR content ILIKE '%%' || %s || '%%'
{category_filter}
LIMIT 50
),
rrf_combined AS (
SELECT
COALESCE(v.id, f.id, k.id) AS id,
COALESCE(v.title, f.title, k.title) AS title,
COALESCE(v.content, f.content, k.content) AS content,
COALESCE(1.0 / (60 + v.v_rank), 0) * {vector_weight} +
COALESCE(1.0 / (60 + f.ft_rank), 0) * {fulltext_weight} +
COALESCE(1.0 / (60 + k.kw_rank), 0) * {keyword_weight} AS rrf_score
FROM vector_results v
FULL OUTER JOIN fulltext_results f ON v.id = f.id
FULL OUTER JOIN keyword_results k ON v.id = k.id
)
SELECT id, title, content, rrf_score
FROM rrf_combined
ORDER BY rrf_score DESC
LIMIT %s
"""
cur = self.conn.cursor()
cur.execute(sql, params)
results = cur.fetchall()
return [
HybridSearchResult(id=r[0], title=r[1], content=r[2], score=r[3])
for r in results
]
def close(self):
self.conn.close()
パターン4:リランキングと結果融合
なぜリランキングが必要か
RRF融合後の結果はランキング位置に基づいており、意味的関連性に基づいていません。Cross-Encoderを使用したリランキングは候補をペアワイズで再スコアリングし、最終結果の品質を大幅に向上させます。
┌──────────────┐
│ ハイブリッド │
│ Top-50候補 │
└──────┬───────┘
│
▼
┌──────────────┐ ┌──────────────┐
│ Reranker │────▶│ 精密結果 │
│ Cross-Encoder│ │ Top-10 │
│ ペアワイズ │ │ 高品質 │
└──────────────┘ └──────────────┘
PostgreSQL内リランキング(ビジネスルールベース)
WITH hybrid_results AS (
SELECT id, title, content, rrf_score
FROM rrf_combined
ORDER BY rrf_score DESC
LIMIT 30
)
SELECT
id, title, content,
rrf_score AS base_score,
rrf_score
* CASE
WHEN published_at > CURRENT_DATE - INTERVAL '30 days' THEN 1.2
WHEN published_at > CURRENT_DATE - INTERVAL '90 days' THEN 1.1
ELSE 1.0
END
* CASE
WHEN doc_type = 'official' THEN 1.3
WHEN doc_type = 'tutorial' THEN 1.1
ELSE 1.0
END AS final_score
FROM hybrid_results h
JOIN knowledge_base kb ON h.id = kb.id
ORDER BY final_score DESC
LIMIT 10;
Python Cross-Encoderリランキング
from sentence_transformers import CrossEncoder
import psycopg2
from openai import OpenAI
class RerankingRAG:
def __init__(self, db_url: str):
self.conn = psycopg2.connect(db_url)
self.client = OpenAI()
self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def embed(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def search_with_rerank(self, query: str, top_k: int = 10, candidate_size: int = 30):
query_embedding = str(self.embed(query))
cur = self.conn.cursor()
cur.execute(
"""SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> %s::vector) > 0.5
ORDER BY embedding <=> %s::vector
LIMIT %s""",
(query_embedding, query_embedding, query_embedding, candidate_size),
)
candidates = cur.fetchall()
pairs = [(query, f"{row[1]}\n{row[2]}") for row in candidates]
rerank_scores = self.reranker.predict(pairs)
results = []
for row, score in zip(candidates, rerank_scores):
results.append({
"id": row[0],
"title": row[1],
"content": row[2],
"vector_score": float(row[3]),
"rerank_score": float(score),
})
results.sort(key=lambda x: x["rerank_score"], reverse=True)
return results[:top_k]
def close(self):
self.conn.close()
パターン5:マルチテナントデータ分離
行レベルセキュリティ(RLS)
ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON knowledge_base
USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);
SET app.current_tenant_id = '42';
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM knowledge_base
ORDER BY embedding <=> $1::vector
LIMIT 10;
パーティションベースの分離
CREATE TABLE knowledge_base_partitioned (
LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY LIST (tenant_id);
CREATE TABLE kb_tenant_1 PARTITION OF knowledge_base_partitioned FOR VALUES IN (1);
CREATE TABLE kb_tenant_2 PARTITION OF knowledge_base_partitioned FOR VALUES IN (2);
CREATE TABLE kb_tenant_3 PARTITION OF knowledge_base_partitioned FOR VALUES IN (3);
CREATE TABLE kb_tenant_default PARTITION OF knowledge_base_partitioned DEFAULT;
CREATE INDEX idx_kb_t1_embedding ON kb_tenant_1
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
CREATE INDEX idx_kb_t2_embedding ON kb_tenant_2
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Pythonマルチテナント検索
import psycopg2
from openai import OpenAI
from contextlib import contextmanager
@contextmanager
def tenant_session(conn: psycopg2.extensions.connection, tenant_id: int):
cur = conn.cursor()
cur.execute("SET app.current_tenant_id = %s", (str(tenant_id),))
try:
yield conn
finally:
cur.execute("RESET app.current_tenant_id")
class MultiTenantRAG:
def __init__(self, db_url: str, openai_api_key: str):
self.db_url = db_url
self.client = OpenAI(api_key=openai_api_key)
def embed(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def search(self, tenant_id: int, query: str, top_k: int = 10):
conn = psycopg2.connect(self.db_url)
query_embedding = str(self.embed(query))
with tenant_session(conn, tenant_id):
cur = conn.cursor()
cur.execute(
"""SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> %s::vector) > 0.6
ORDER BY embedding <=> %s::vector
LIMIT %s""",
(query_embedding, query_embedding, query_embedding, top_k),
)
results = cur.fetchall()
conn.close()
return [
{"id": r[0], "title": r[1], "content": r[2], "score": float(r[3])}
for r in results
]
パターン6:パフォーマンスチューニングとクエリ最適化
主要PostgreSQLパラメータ
SET work_mem = '256MB';
SET maintenance_work_mem = '1GB';
SET max_parallel_workers_per_gather = 4;
SET hnsw.ef_search = 40;
SET ivfflat.probes = 10;
EXPLAIN ANALYZE分析
EXPLAIN ANALYZE
SELECT id, title, 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) > 0.7
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;
プリフィルター最適化
CREATE INDEX idx_kb_ai_embedding ON knowledge_base
USING hnsw (embedding vector_cosine_ops)
WHERE category = 'AI';
コネクションプール設定
# pgbouncer.ini
[databases]
mydb = host=127.0.0.1 port=5432 dbname=mydb
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 50
reserve_pool_size = 10
reserve_pool_timeout = 3
server_idle_timeout = 300
バッチ書き込み最適化
import psycopg2
from openai import OpenAI
import io
client = OpenAI()
def bulk_insert_with_copy(
conn_params: dict,
records: list[dict],
):
conn = psycopg2.connect(**conn_params)
cur = conn.cursor()
texts = [f"{r['title']}\n{r['content']}" for r in records]
embeddings = []
for i in range(0, len(texts), 100):
batch = texts[i:i + 100]
response = client.embeddings.create(
model="text-embedding-3-small",
input=batch,
)
embeddings.extend([d.embedding for d in response.data])
buffer = io.StringIO()
for record, emb in zip(records, embeddings):
line = "\t".join([
str(record.get("tenant_id", 1)),
record["title"].replace("\t", " ").replace("\n", " "),
record["content"].replace("\t", " ").replace("\n", " "),
str(emb),
])
buffer.write(line + "\n")
buffer.seek(0)
cur.copy_from(buffer, "knowledge_base", columns=[
"tenant_id", "title", "content", "embedding"
])
conn.commit()
cur.close()
conn.close()
パターン7:本番デプロイとバックアップリカバリ
Docker Compose本番設定
services:
postgres:
image: pgvector/pgvector:pg16
restart: always
environment:
POSTGRES_DB: rag_production
POSTGRES_USER: rag_app
POSTGRES_PASSWORD: ${PG_PASSWORD}
POSTGRES_INITDB_ARGS: "--encoding=UTF-8"
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
command: >
postgres
-c shared_buffers=4GB
-c work_mem=256MB
-c maintenance_work_mem=1GB
-c max_parallel_workers_per_gather=4
-c effective_cache_size=12GB
-c random_page_cost=1.1
healthcheck:
test: ["CMD-SHELL", "pg_isready -U rag_app -d rag_production"]
interval: 10s
timeout: 5s
retries: 5
pgbouncer:
image: edoburu/pgbouncer
restart: always
environment:
DB_HOST: postgres
DB_PORT: 5432
DB_USER: rag_app
DB_PASSWORD: ${PG_PASSWORD}
DB_NAME: rag_production
POOL_MODE: transaction
MAX_CLIENT_CONN: 1000
DEFAULT_POOL_SIZE: 50
ports:
- "6432:5432"
depends_on:
postgres:
condition: service_healthy
volumes:
pgdata:
driver: local
バックアップ戦略
#!/bin/bash
DB_NAME="rag_production"
DB_USER="rag_app"
BACKUP_DIR="/backups/pgvector"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
pg_dump -U $DB_USER -d $DB_NAME -F c -f "${BACKUP_DIR}/full_${DATE}.dump"
pg_dump -U $DB_USER -d $DB_NAME -t knowledge_base -F c -f "${BACKUP_DIR}/kb_${DATE}.dump"
find $BACKUP_DIR -name "*.dump" -mtime +7 -delete
echo "Backup completed: ${DATE}"
リカバリ
pg_restore -U rag_app -d rag_production -c "${BACKUP_DIR}/full_20260616_030000.dump"
pg_restore -U rag_app -d rag_production -t knowledge_base "${BACKUP_DIR}/kb_20260616_030000.dump"
監視メトリクス
SELECT pg_size_pretty(pg_relation_size('idx_kb_embedding')) AS index_size;
SELECT pg_size_pretty(pg_total_relation_size('knowledge_base')) AS total_size;
SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE indexname = 'idx_kb_embedding';
SELECT sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) AS cache_hit_ratio
FROM pg_statio_user_tables
WHERE relname = 'knowledge_base';
5つのよくある落とし穴と解決策
落とし穴1:HNSWインデックス構築OOM
現象:100万行のデータでHNSWインデックス構築時にメモリ不足。
解決策:maintenance_work_memを増加、ef_constructionを低下、CONCURRENTLYを使用。
SET maintenance_work_mem = '2GB';
CREATE INDEX CONCURRENTLY idx_docs_embedding ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 32);
落とし穴2:ベクトル次元の不一致
現象:ERROR: expected 1536 dimensions, not 768。
解決策:Embeddingモデルの出力次元がテーブル定義と一致することを確認。
| モデル | 出力次元 |
|---|---|
| text-embedding-3-small | 1536 |
| text-embedding-3-large | 3072 |
| text-embedding-ada-002 | 1536 |
| bge-large-en-v1.5 | 1024 |
| bge-small-en-v1.5 | 384 |
ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE vector(768);
落とし穴3:IVFFlatインデックスのデータ更新後リコール急低下
現象:大量のINSERT後に検索結果が不正確に。
解決策:定期的にREINDEXするか、HNSWに切り替え。
REINDEX INDEX CONCURRENTLY idx_docs_embedding_ivf;
落とし穴4:ハイブリッド検索のパフォーマンス低下
現象:3ウェイハイブリッド検索に2秒以上かかる。
解決策:各検索の候補セットサイズを制限(各50件)、インデックスが使用されていることを確認。
落とし穴5:マルチテナントデータ漏洩
現象:テナントAのクエリがテナントBのデータを返す。
解決策:RLSを有効化、すべてのクエリにtenant_idフィルターを含める。
ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON knowledge_base
USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);
10のよくあるエラートラブルシューティング
| # | エラー | 原因 | 解決策 |
|---|---|---|---|
| 1 | extension "vector" not available |
pgvector未インストール | pgvector/pgvector Dockerイメージを使用またはソースからコンパイル |
| 2 | expected 1536 dimensions, not 768 |
ベクトル次元の不一致 | Embeddingモデルを確認、テーブル定義vector(N)を変更 |
| 3 | operator does not exist: vector <=> unknown |
型キャストの欠落 | $1::vector明示的キャストを使用 |
| 4 | index row size exceeds maximum 2712 |
HNSW行が大きすぎる | mパラメータを低下、またはhalfvec(1536)を使用 |
| 5 | cannot create index concurrently |
pgvectorバージョンが古い | 0.7+にアップグレード |
| 6 | out of memory during index build |
インデックス構築メモリ不足 | maintenance_work_memを増加、ef_constructionを低下 |
| 7 | invalid input syntax for type vector |
ベクトル形式エラー | [0.1, 0.2, ...]形式を使用 |
| 8 | permission denied for schema public |
CREATE EXTENSION権限なし | スーパーユーザーでCREATE EXTENSIONを実行 |
| 9 | IVFFlat recall drops after inserts |
新データがクラスタ中心から逸脱 | 定期的にREINDEXまたはHNSWに切り替え |
| 10 | sequential scan on vector column |
インデックスが使用されていない | 次元の一致、型キャスト、データ量を確認 |
高度な最適化テクニック
1. 半精度ベクトルで50%保存
ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE halfvec(1536);
CREATE INDEX idx_kb_embedding_half ON knowledge_base
USING hnsw (embedding halfvec_cosine_ops)
WITH (m = 16, ef_construction = 64);
2. スパースベクトル
CREATE TABLE sparse_docs (
id BIGSERIAL PRIMARY KEY,
content TEXT,
embedding sparsevec(30000)
);
INSERT INTO sparse_docs (content, embedding)
VALUES ('example', '{1:0.1, 5:0.3, 100:0.7}/30000');
3. 時間ベースパーティショニング(ホット/コールド分離)
CREATE TABLE knowledge_base_partitioned (
LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY RANGE (created_at);
CREATE TABLE kb_2026_q1 PARTITION OF knowledge_base_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');
CREATE TABLE kb_2026_q2 PARTITION OF knowledge_base_partitioned
FOR VALUES FROM ('2026-04-01') TO ('2026-07-01');
CREATE TABLE kb_archive PARTITION OF knowledge_base_partitioned
DEFAULT;
ALTER TABLE kb_archive SET TABLESPACE slow_disk;
4. 非同期エンベディング更新
import psycopg2
import redis
import json
from openai import OpenAI
r = redis.Redis()
client = OpenAI()
def enqueue_embedding_update(doc_id: int, text: str):
r.lpush("embedding_queue", json.dumps({"doc_id": doc_id, "text": text}))
def process_embedding_queue():
conn = psycopg2.connect("dbname=mydb user=postgres password=password")
cur = conn.cursor()
while True:
_, data = r.brpop("embedding_queue", timeout=30)
if data is None:
continue
task = json.loads(data)
response = client.embeddings.create(
model="text-embedding-3-small",
input=task["text"],
)
embedding = response.data[0].embedding
cur.execute(
"UPDATE knowledge_base SET embedding = %s WHERE id = %s",
(str(embedding), task["doc_id"]),
)
conn.commit()
cur.close()
conn.close()
5. クエリキャッシュ
import hashlib
import json
import redis
r = redis.Redis()
def cached_vector_search(query: str, search_fn, ttl: int = 3600):
cache_key = f"vec_search:{hashlib.md5(query.encode()).hexdigest()}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
results = search_fn(query)
r.setex(cache_key, ttl, json.dumps(results))
return results
比較分析:pgvector vs Pinecone vs Milvus vs Qdrant
| 項目 | pgvector | Pinecone | Milvus | Qdrant |
|---|---|---|---|---|
| デプロイ方式 | PG拡張 | フルマネージド | セルフ/マネージド | セルフ/マネージド |
| 運用コスト | 極小(PG再利用) | 高(クエリ課金) | 高 | 中 |
| ハイブリッドクエリ | ネイティブSQL | 限定的フィルタ | 限定的フィルタ | フィルタ |
| トランザクション | ACID | なし | なし | なし |
| 最大次元 | 16000 | 20000 | 32768 | 65535 |
| インデックスタイプ | HNSW/IVFFlat | 独自 | HNSW/IVF/DiskANN | HNSW |
| 水平スケール | Citus必要 | 自動 | 自動 | 自動 |
| 対応規模 | 百万規模 | 十億規模 | 十億規模 | 億規模 |
| データ整合性 | 強整合 | 結果整合 | 結果整合 | 結果整合 |
| 半精度サポート | halfvec | あり | あり | あり |
| マルチテナント | RLS/パーティション | ネームスペース | コレクション | コレクション |
| 学習曲線 | 低(SQL) | 低(API) | 高 | 中 |
| コスト(100万ベクトル) | $0(既存PG) | ~$70/月 | ~$50/月(セルフ) | ~$40/月(セルフ) |
選択ガイド:
- 既存PostgreSQL + 百万規模データ → pgvector(追加コストなし) | フルマネージド需要 + コストOK → Pinecone
- 十億規模 + 水平スケーリング → Milvus
- 中規模 + 柔軟なフィルタリング → Qdrant
オンラインツールおすすめ
- JSONフォーマッター(APIレスポンスデバッグ):/ja/json/format
- curl to Code(Embedding APIテスト):/ja/dev/curl-to-code
- SQLフォーマッター(クエリ美化):/ja/utils/sql-format
関連記事
まとめ
PostgreSQL pgvector RAGは2026年、中小規模のRAGシステムにおける最適解となっています。7つのプロダクションパターンは、テーブル設計、インデックス最適化、ハイブリッド検索、リランキング、マルチテナント分離から本番デプロイまでの完全なパイプラインをカバーしています。コアアドバンテージ:追加の運用コストなし、ネイティブSQLハイブリッドクエリ、ACIDトランザクション保証。百万規模のデータでHNSWインデックスは優れたパフォーマンスを発揮し、3ウェイハイブリッド検索(ベクトル+全文+キーワード)+ RRF融合 + Cross-Encoderリランキングは、純粋なベクトル検索をはるかに超えるリコールを提供します。データが億規模に達するか、水平スケーリングが必要な場合にのみ、専用ベクトルデータベースの検討が必要です。
参考リソース:
ブラウザローカルツールを無料で試す →