PostgreSQL pgvector RAG實戰:從嵌入儲存到混合檢索的7種生產模式

数据库

你的團隊剛決定上RAG,架構師就甩出一份Pinecone帳單——每月$2000起。更頭痛的是,業務資料在PostgreSQL,向量資料在Pinecone,雙寫一致性誰來保證?2026年,pgvector已經從「能用」進化到「好用」:HNSW索引效能飆升、halfvec半精度儲存、原生混合檢索……你現有的PostgreSQL就是最好的向量資料庫。


核心收穫

  • 掌握pgvector RAG的7種生產級模式,從表設計到混合檢索全覆蓋
  • 理解HNSW與IVFFlat索引的選型策略與參數調優
  • 實作向量+全文+關鍵詞三路混合檢索與RRF重排序
  • 解決多租戶隔離、效能調優、備份恢復等生產難題
  • 獲得可直接使用的完整Python RAG程式碼

目錄

  1. pgvector核心概念
  2. Pattern 1:嵌入儲存與表設計
  3. Pattern 2:HNSW索引最佳化
  4. Pattern 3:混合檢索(向量+全文+關鍵詞)
  5. Pattern 4:重排序與結果融合
  6. Pattern 5:多租戶資料隔離
  7. Pattern 6:效能調優與查詢最佳化
  8. Pattern 7:生產部署與備份恢復
  9. 5個常見坑及解決方案
  10. 10個常見報錯排查
  11. 進階最佳化技巧
  12. 對比分析:pgvector vs Pinecone vs Milvus vs Qdrant
  13. 線上工具推薦

pgvector核心概念

RAG與向量檢索的關係

RAG(Retrieval-Augmented Generation)的核心流程:使用者提問 → 向量檢索找到相關文件 → 將文件作為上下文餵給LLM → 生成回答。向量檢索是RAG的基石,pgvector讓PostgreSQL具備了這一能力。

┌─────────────┐     ┌──────────────┐     ┌──────────────┐
│  User Query  │────▶│  Embedding   │────▶│  pgvector    │
│  使用者提問   │     │  Model       │     │  相似度搜尋   │
└─────────────┘     └──────────────┘     └──────┬───────┘
                                                 │
                                                 ▼
┌─────────────┐     ┌──────────────┐     ┌──────────────┐
│  LLM Answer  │◀────│  Context +   │◀────│  Top-K Docs  │
│  生成回答     │     │  Prompt      │     │  相關文件     │
└─────────────┘     └──────────────┘     └──────────────┘

pgvector距離度量

度量方式 運算子 索引運算子類 適用場景 值域
餘弦距離 <=> vector_cosine_ops 語意相似度(最常用) [0, 2]
歐氏距離 <-> vector_l2_ops 空間距離 [0, +∞)
內積 <#> vector_ip_ops 歸一化向量 (-∞, +∞)

pgvector版本演進

版本 釋出時間 關鍵特性
0.5 2023 HNSW索引、半精度向量
0.6 2024 平行索引建構、稀疏向量
0.7 2024 CONCURRENTLY支援、halfvec索引
0.8 2025 SPARSEVEC型別、量化支援
0.9 2026 改進HNSW建構速度、多核心平行

Pattern 1:嵌入儲存與表設計

基礎表結構

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
    id BIGSERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    source TEXT,
    category TEXT,
    tags TEXT[],
    chunk_index INTEGER DEFAULT 0,
    token_count INTEGER,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_category ON documents(category);
CREATE INDEX idx_documents_created ON documents(created_at DESC);

分塊儲存設計

RAG系統中,長文件需要分塊(Chunking)後分別生成嵌入:

CREATE TABLE document_chunks (
    id BIGSERIAL PRIMARY KEY,
    document_id BIGINT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content TEXT NOT NULL,
    token_count INTEGER,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_embedding ON document_chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

元資料+向量聯合表

CREATE TABLE knowledge_base (
    id BIGSERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL,
    doc_type VARCHAR(50),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    summary TEXT,
    keywords TEXT[],
    published_at DATE,
    access_level INTEGER DEFAULT 0,
    embedding vector(1536),
    tsv tsvector GENERATED ALWAYS AS (
        to_tsvector('simple',
            coalesce(title, '') || ' ' ||
            coalesce(content, '') || ' ' ||
            coalesce(array_to_string(keywords, ' '), '')
        )
    ) STORED,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_kb_tsv ON knowledge_base USING gin(tsv);
CREATE INDEX idx_kb_tenant_type ON knowledge_base(tenant_id, doc_type);
CREATE INDEX idx_kb_embedding ON knowledge_base
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

Python批量嵌入寫入

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

client = OpenAI()

@dataclass
class Document:
    title: str
    content: str
    source: str
    category: str

def generate_embeddings(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        all_embeddings.extend([d.embedding for d in response.data])
    return all_embeddings

def chunk_text(text: str, max_tokens: int = 500, overlap: int = 50) -> list[str]:
    words = text.split()
    chunks = []
    step = max_tokens - overlap
    for i in range(0, len(words), step):
        chunk = " ".join(words[i:i + max_tokens])
        chunks.append(chunk)
    return chunks

def insert_documents_with_chunks(
    conn_params: dict,
    docs: list[Document],
):
    conn = psycopg2.connect(**conn_params)
    cur = conn.cursor()

    for doc in docs:
        cur.execute(
            """INSERT INTO documents (tenant_id, title, content, source, category)
               VALUES (1, %s, %s, %s, %s) RETURNING id""",
            (doc.title, doc.content, doc.source, doc.category),
        )
        doc_id = cur.fetchone()[0]

        chunks = chunk_text(doc.content)
        texts = [f"{doc.title}\n{c}" for c in chunks]
        embeddings = generate_embeddings(texts)

        for idx, (chunk, emb) in enumerate(zip(chunks, embeddings)):
            cur.execute(
                """INSERT INTO document_chunks
                   (document_id, chunk_index, content, embedding)
                   VALUES (%s, %s, %s, %s)""",
                (doc_id, idx, chunk, str(emb)),
            )

    conn.commit()
    cur.close()
    conn.close()

insert_documents_with_chunks(
    conn_params={"dbname": "mydb", "user": "postgres", "password": "password"},
    docs=[
        Document(
            title="pgvector效能最佳化指南",
            content="pgvector是PostgreSQL的向量檢索擴充套件...",
            source="toolsku.dev/blog",
            category="資料庫",
        ),
    ],
)

Pattern 2:HNSW索引最佳化

HNSW參數詳解

                    HNSW索引結構
            ┌──────────────────────────┐
            │     Layer 2 (稀疏)        │
            │       ○──────○           │
            │      /        \          │
            ├─────/──────────\─────────┤
            │   Layer 1 (中等)         │
            │   ○──○──○──○──○          │
            │  / \  |   |  / \         │
            ├─/──\─│───│─/──\──────────┤
            │ Layer 0 (密集)           │
            │ ○─○─○─○─○─○─○─○─○       │
            │  m=16 每節點最多16條邊    │
            └──────────────────────────┘

    ef_construction: 建構時搜尋寬度(越大越精確,建構越慢)
    m: 每個節點的最大連線數(越大越精確,記憶體越大)
    ef_search: 查詢時搜尋寬度(越大越精確,查詢越慢)
參數 預設值 建議範圍 影響
m 16 8-64 連線度,越大召回越高、記憶體越大
ef_construction 64 32-200 建構品質,越大索引越好、建構越慢
ef_search 40 20-200 查詢精度,越大召回越高、查詢越慢

不同場景的索引參數

-- 高召回場景(法律/醫療文件檢索)
CREATE INDEX idx_docs_embedding_high_recall ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 48, ef_construction = 128);

-- 平衡場景(通用RAG)
CREATE INDEX idx_docs_embedding_balanced ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- 低延遲場景(即時推薦)
CREATE INDEX idx_docs_embedding_low_latency ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 8, ef_construction = 32);

IVFFlat索引(大規模資料備選)

CREATE INDEX idx_docs_embedding_ivf ON documents
    USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
索引 建構時間(100萬行) 查詢延遲 召回率 增量友好 記憶體佔用
HNSW m=16 ~15min ~2ms 98% 2-3x
HNSW m=48 ~45min ~5ms 99.5% 4-5x
IVFFlat lists=100 ~3min ~3ms 90% 否(需REINDEX) 1-1.5x

查詢時動態調整精度

SET LOCAL hnsw.ef_search = 200;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 10;

SET LOCAL hnsw.ef_search = 20;
SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 5;

Pattern 3:混合檢索(向量+全文+關鍵詞)

為什麼需要混合檢索

純向量檢索擅長語意匹配但可能遺漏精確關鍵詞;純全文檢索擅長關鍵詞匹配但無法理解語意。混合檢索結合兩者優勢,是2026年RAG系統的標配。

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  向量檢索      │  │  全文檢索      │  │  關鍵詞檢索   │
│  (pgvector)   │  │  (tsvector)  │  │  (LIKE/ILIKE) │
│  語意匹配      │  │  詞頻匹配      │  │  精確匹配      │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                  │                  │
       ▼                  ▼                  ▼
┌──────────────────────────────────────────────────┐
│              RRF (Reciprocal Rank Fusion)         │
│  score = Σ 1/(k + rank_i)   k=60                 │
└──────────────────────┬───────────────────────────┘
                       │
                       ▼
              ┌──────────────────┐
              │  最終排序結果      │
              └──────────────────┘

三路混合檢索SQL

WITH vector_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS v_rank,
           1 - (embedding <=> $1::vector) AS v_score
    FROM knowledge_base
    WHERE 1 - (embedding <=> $1::vector) > 0.5
    ORDER BY embedding <=> $1::vector
    LIMIT 50
),
fulltext_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (
               ORDER BY ts_rank(tsv, plainto_tsquery('simple', $2)) DESC
           ) AS ft_rank,
           ts_rank(tsv, plainto_tsquery('simple', $2)) AS ft_score
    FROM knowledge_base
    WHERE tsv @@ plainto_tsquery('simple', $2)
    LIMIT 50
),
keyword_results AS (
    SELECT id, title, content,
           ROW_NUMBER() OVER (
               ORDER BY
                   CASE WHEN title ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END,
                   CASE WHEN content ILIKE '%' || $2 || '%' THEN 0 ELSE 1 END
           ) AS kw_rank
    FROM knowledge_base
    WHERE title ILIKE '%' || $2 || '%'
       OR content ILIKE '%' || $2 || '%'
    LIMIT 50
),
rrf_combined AS (
    SELECT
        COALESCE(v.id, f.id, k.id) AS id,
        COALESCE(v.title, f.title, k.title) AS title,
        COALESCE(v.content, f.content, k.content) AS content,
        COALESCE(1.0 / (60 + v.v_rank), 0) * 0.5 +
        COALESCE(1.0 / (60 + f.ft_rank), 0) * 0.3 +
        COALESCE(1.0 / (60 + k.kw_rank), 0) * 0.2 AS rrf_score
    FROM vector_results v
    FULL OUTER JOIN fulltext_results f ON v.id = f.id
    FULL OUTER JOIN keyword_results k ON v.id = k.id
)
SELECT id, title, content, rrf_score
FROM rrf_combined
ORDER BY rrf_score DESC
LIMIT 10;

Python混合檢索封裝

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

@dataclass
class HybridSearchResult:
    id: int
    title: str
    content: str
    score: float

class PgVectorHybridSearch:
    def __init__(self, db_url: str, openai_api_key: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def hybrid_search(
        self,
        query: str,
        top_k: int = 10,
        vector_weight: float = 0.5,
        fulltext_weight: float = 0.3,
        keyword_weight: float = 0.2,
        category: str | None = None,
    ) -> list[HybridSearchResult]:
        query_embedding = str(self.embed(query))
        category_filter = "AND category = %s" if category else ""
        params = [query_embedding, query_embedding]
        if category:
            params.append(category)
        params.extend([query_embedding, query])
        if category:
            params.append(category)
        params.extend([query, query])
        if category:
            params.append(category)
        params.extend([query_embedding, query, top_k])

        sql = f"""
        WITH vector_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
            FROM knowledge_base
            WHERE 1 - (embedding <=> %s::vector) > 0.5
            {category_filter}
            ORDER BY embedding <=> %s::vector
            LIMIT 50
        ),
        fulltext_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (
                       ORDER BY ts_rank(tsv, plainto_tsquery('simple', %s)) DESC
                   ) AS ft_rank
            FROM knowledge_base
            WHERE tsv @@ plainto_tsquery('simple', %s)
            {category_filter}
            LIMIT 50
        ),
        keyword_results AS (
            SELECT id, title, content,
                   ROW_NUMBER() OVER (
                       ORDER BY
                       CASE WHEN title ILIKE '%%' || %s || '%%' THEN 0 ELSE 1 END
                   ) AS kw_rank
            FROM knowledge_base
            WHERE title ILIKE '%%' || %s || '%%'
               OR content ILIKE '%%' || %s || '%%'
            {category_filter}
            LIMIT 50
        ),
        rrf_combined AS (
            SELECT
                COALESCE(v.id, f.id, k.id) AS id,
                COALESCE(v.title, f.title, k.title) AS title,
                COALESCE(v.content, f.content, k.content) AS content,
                COALESCE(1.0 / (60 + v.v_rank), 0) * {vector_weight} +
                COALESCE(1.0 / (60 + f.ft_rank), 0) * {fulltext_weight} +
                COALESCE(1.0 / (60 + k.kw_rank), 0) * {keyword_weight} AS rrf_score
            FROM vector_results v
            FULL OUTER JOIN fulltext_results f ON v.id = f.id
            FULL OUTER JOIN keyword_results k ON v.id = k.id
        )
        SELECT id, title, content, rrf_score
        FROM rrf_combined
        ORDER BY rrf_score DESC
        LIMIT %s
        """

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            HybridSearchResult(id=r[0], title=r[1], content=r[2], score=r[3])
            for r in results
        ]

    def close(self):
        self.conn.close()

Pattern 4:重排序與結果融合

為什麼需要重排序

RRF融合後的結果排序基於排名而非語意相關性。重排序(Reranking)使用Cross-Encoder對候選結果進行精排,顯著提升最終結果品質。

┌──────────────┐
│  混合檢索      │
│  Top-50候選    │
└──────┬───────┘
       │
       ▼
┌──────────────┐     ┌──────────────┐
│  Reranker     │────▶│  精排結果      │
│  Cross-Encoder│     │  Top-10       │
│  逐對打分      │     │  高品質        │
└──────────────┘     └──────────────┘

PostgreSQL內重排序(基於業務規則)

WITH hybrid_results AS (
    SELECT id, title, content, rrf_score
    FROM rrf_combined
    ORDER BY rrf_score DESC
    LIMIT 30
)
SELECT
    id, title, content,
    rrf_score AS base_score,
    rrf_score
        * CASE
            WHEN published_at > CURRENT_DATE - INTERVAL '30 days' THEN 1.2
            WHEN published_at > CURRENT_DATE - INTERVAL '90 days' THEN 1.1
            ELSE 1.0
        END
        * CASE
            WHEN doc_type = 'official' THEN 1.3
            WHEN doc_type = 'tutorial' THEN 1.1
            ELSE 1.0
        END AS final_score
FROM hybrid_results h
JOIN knowledge_base kb ON h.id = kb.id
ORDER BY final_score DESC
LIMIT 10;

Python Cross-Encoder重排序

from sentence_transformers import CrossEncoder
import psycopg2
from openai import OpenAI

class RerankingRAG:
    def __init__(self, db_url: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI()
        self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search_with_rerank(self, query: str, top_k: int = 10, candidate_size: int = 30):
        query_embedding = str(self.embed(query))
        cur = self.conn.cursor()

        cur.execute(
            """SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
               FROM knowledge_base
               WHERE 1 - (embedding <=> %s::vector) > 0.5
               ORDER BY embedding <=> %s::vector
               LIMIT %s""",
            (query_embedding, query_embedding, query_embedding, candidate_size),
        )
        candidates = cur.fetchall()

        pairs = [(query, f"{row[1]}\n{row[2]}") for row in candidates]
        rerank_scores = self.reranker.predict(pairs)

        results = []
        for row, score in zip(candidates, rerank_scores):
            results.append({
                "id": row[0],
                "title": row[1],
                "content": row[2],
                "vector_score": float(row[3]),
                "rerank_score": float(score),
            })

        results.sort(key=lambda x: x["rerank_score"], reverse=True)
        return results[:top_k]

    def close(self):
        self.conn.close()

Pattern 5:多租戶資料隔離

列級安全策略(RLS)

ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON knowledge_base
    USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);

SET app.current_tenant_id = '42';

SELECT id, title, 1 - (embedding <=> $1::vector) AS similarity
FROM knowledge_base
ORDER BY embedding <=> $1::vector
LIMIT 10;

分割區表隔離

CREATE TABLE knowledge_base_partitioned (
    LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY LIST (tenant_id);

CREATE TABLE kb_tenant_1 PARTITION OF knowledge_base_partitioned FOR VALUES IN (1);
CREATE TABLE kb_tenant_2 PARTITION OF knowledge_base_partitioned FOR VALUES IN (2);
CREATE TABLE kb_tenant_3 PARTITION OF knowledge_base_partitioned FOR VALUES IN (3);
CREATE TABLE kb_tenant_default PARTITION OF knowledge_base_partitioned DEFAULT;

CREATE INDEX idx_kb_t1_embedding ON kb_tenant_1
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);
CREATE INDEX idx_kb_t2_embedding ON kb_tenant_2
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

Python多租戶檢索

import psycopg2
from openai import OpenAI
from contextlib import contextmanager

@contextmanager
def tenant_session(conn: psycopg2.extensions.connection, tenant_id: int):
    cur = conn.cursor()
    cur.execute("SET app.current_tenant_id = %s", (str(tenant_id),))
    try:
        yield conn
    finally:
        cur.execute("RESET app.current_tenant_id")

class MultiTenantRAG:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search(self, tenant_id: int, query: str, top_k: int = 10):
        conn = psycopg2.connect(self.db_url)
        query_embedding = str(self.embed(query))

        with tenant_session(conn, tenant_id):
            cur = conn.cursor()
            cur.execute(
                """SELECT id, title, content, 1 - (embedding <=> %s::vector) AS similarity
                   FROM knowledge_base
                   WHERE 1 - (embedding <=> %s::vector) > 0.6
                   ORDER BY embedding <=> %s::vector
                   LIMIT %s""",
                (query_embedding, query_embedding, query_embedding, top_k),
            )
            results = cur.fetchall()

        conn.close()
        return [
            {"id": r[0], "title": r[1], "content": r[2], "score": float(r[3])}
            for r in results
        ]

Pattern 6:效能調優與查詢最佳化

關鍵PostgreSQL參數

SET work_mem = '256MB';
SET maintenance_work_mem = '1GB';
SET max_parallel_workers_per_gather = 4;
SET hnsw.ef_search = 40;
SET ivfflat.probes = 10;

EXPLAIN ANALYZE分析

EXPLAIN ANALYZE
SELECT id, title, 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM knowledge_base
WHERE 1 - (embedding <=> '[0.1, 0.2, ...]'::vector) > 0.7
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;

預過濾最佳化

CREATE INDEX idx_kb_ai_embedding ON knowledge_base
    USING hnsw (embedding vector_cosine_ops)
    WHERE category = 'AI';

連線池配置

# pgbouncer.ini
[databases]
mydb = host=127.0.0.1 port=5432 dbname=mydb

[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 50
reserve_pool_size = 10
reserve_pool_timeout = 3
server_idle_timeout = 300

批量寫入最佳化

import psycopg2
from openai import OpenAI
import io

client = OpenAI()

def bulk_insert_with_copy(
    conn_params: dict,
    records: list[dict],
):
    conn = psycopg2.connect(**conn_params)
    cur = conn.cursor()

    texts = [f"{r['title']}\n{r['content']}" for r in records]
    embeddings = []
    for i in range(0, len(texts), 100):
        batch = texts[i:i + 100]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        embeddings.extend([d.embedding for d in response.data])

    buffer = io.StringIO()
    for record, emb in zip(records, embeddings):
        line = "\t".join([
            str(record.get("tenant_id", 1)),
            record["title"].replace("\t", " ").replace("\n", " "),
            record["content"].replace("\t", " ").replace("\n", " "),
            str(emb),
        ])
        buffer.write(line + "\n")

    buffer.seek(0)
    cur.copy_from(buffer, "knowledge_base", columns=[
        "tenant_id", "title", "content", "embedding"
    ])
    conn.commit()
    cur.close()
    conn.close()

Pattern 7:生產部署與備份恢復

Docker Compose生產配置

services:
  postgres:
    image: pgvector/pgvector:pg16
    restart: always
    environment:
      POSTGRES_DB: rag_production
      POSTGRES_USER: rag_app
      POSTGRES_PASSWORD: ${PG_PASSWORD}
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8"
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    command: >
      postgres
        -c shared_buffers=4GB
        -c work_mem=256MB
        -c maintenance_work_mem=1GB
        -c max_parallel_workers_per_gather=4
        -c effective_cache_size=12GB
        -c random_page_cost=1.1
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U rag_app -d rag_production"]
      interval: 10s
      timeout: 5s
      retries: 5

  pgbouncer:
    image: edoburu/pgbouncer
    restart: always
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_USER: rag_app
      DB_PASSWORD: ${PG_PASSWORD}
      DB_NAME: rag_production
      POOL_MODE: transaction
      MAX_CLIENT_CONN: 1000
      DEFAULT_POOL_SIZE: 50
    ports:
      - "6432:5432"
    depends_on:
      postgres:
        condition: service_healthy

volumes:
  pgdata:
    driver: local

備份策略

#!/bin/bash
DB_NAME="rag_production"
DB_USER="rag_app"
BACKUP_DIR="/backups/pgvector"
DATE=$(date +%Y%m%d_%H%M%S)

mkdir -p $BACKUP_DIR

pg_dump -U $DB_USER -d $DB_NAME -F c -f "${BACKUP_DIR}/full_${DATE}.dump"
pg_dump -U $DB_USER -d $DB_NAME -t knowledge_base -F c -f "${BACKUP_DIR}/kb_${DATE}.dump"

find $BACKUP_DIR -name "*.dump" -mtime +7 -delete

echo "Backup completed: ${DATE}"

恢復流程

pg_restore -U rag_app -d rag_production -c "${BACKUP_DIR}/full_20260616_030000.dump"
pg_restore -U rag_app -d rag_production -t knowledge_base "${BACKUP_DIR}/kb_20260616_030000.dump"

監控指標

SELECT pg_size_pretty(pg_relation_size('idx_kb_embedding')) AS index_size;
SELECT pg_size_pretty(pg_total_relation_size('knowledge_base')) AS total_size;

SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE indexname = 'idx_kb_embedding';

SELECT sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) AS cache_hit_ratio
FROM pg_statio_user_tables
WHERE relname = 'knowledge_base';

5個常見坑及解決方案

坑1:HNSW索引建構OOM

現象:百萬級資料建構HNSW索引時記憶體溢位。

解決:增大maintenance_work_mem,降低ef_construction,使用CONCURRENTLY

SET maintenance_work_mem = '2GB';
CREATE INDEX CONCURRENTLY idx_docs_embedding ON documents
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 32);

坑2:向量維度不匹配

現象ERROR: expected 1536 dimensions, not 768

解決:確認Embedding模型輸出維度與表定義一致。

模型 輸出維度
text-embedding-3-small 1536
text-embedding-3-large 3072
text-embedding-ada-002 1536
bge-large-zh-v1.5 1024
bge-small-zh-v1.5 512
ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE vector(768);

坑3:IVFFlat索引資料更新後召回率驟降

現象:大量INSERT後搜尋結果明顯不準。

解決:定期REINDEX或改用HNSW。

REINDEX INDEX CONCURRENTLY idx_docs_embedding_ivf;

坑4:混合檢索效能差

現象:三路混合檢索耗時超過2秒。

解決:限制各路候選集大小(各50條),確保分別走索引。

坑5:多租戶資料洩露

現象:租戶A的查詢返回了租戶B的資料。

解決:啟用RLS,所有查詢必須帶tenant_id過濾。

ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON knowledge_base
    USING (tenant_id = current_setting('app.current_tenant_id')::INTEGER);

10個常見報錯排查

序號 報錯資訊 原因 解決方法
1 extension "vector" not available pgvector未安裝 使用pgvector/pgvector Docker映像或編譯安裝
2 expected 1536 dimensions, not 768 向量維度不匹配 檢查Embedding模型,修改表定義vector(N)
3 operator does not exist: vector <=> unknown 缺少型別轉換 使用$1::vector顯式轉換
4 index row size exceeds maximum 2712 HNSW索引行過大 降低m參數,或使用halfvec(1536)
5 cannot create index concurrently pgvector版本過低 升級到0.7+
6 out of memory during index build 建構索引記憶體不足 增大maintenance_work_mem,降低ef_construction
7 invalid input syntax for type vector 向量格式錯誤 確保使用[0.1, 0.2, ...]格式
8 permission denied for schema public 無建立擴充套件權限 使用超級使用者執行CREATE EXTENSION
9 IVFFlat recall drops after inserts 新資料偏離聚類中心 定期REINDEX或改用HNSW
10 sequential scan on vector column 索引未被使用 檢查維度匹配、型別轉換、資料量是否過小

進階最佳化技巧

1. 半精度向量節省50%儲存

ALTER TABLE knowledge_base ALTER COLUMN embedding TYPE halfvec(1536);

CREATE INDEX idx_kb_embedding_half ON knowledge_base
    USING hnsw (embedding halfvec_cosine_ops)
    WITH (m = 16, ef_construction = 64);

2. 稀疏向量

CREATE TABLE sparse_docs (
    id BIGSERIAL PRIMARY KEY,
    content TEXT,
    embedding sparsevec(30000)
);

INSERT INTO sparse_docs (content, embedding)
VALUES ('example', '{1:0.1, 5:0.3, 100:0.7}/30000');

3. 時間分割區冷熱分離

CREATE TABLE knowledge_base_partitioned (
    LIKE knowledge_base INCLUDING DEFAULTS
) PARTITION BY RANGE (created_at);

CREATE TABLE kb_2026_q1 PARTITION OF knowledge_base_partitioned
    FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');
CREATE TABLE kb_2026_q2 PARTITION OF knowledge_base_partitioned
    FOR VALUES FROM ('2026-04-01') TO ('2026-07-01');
CREATE TABLE kb_archive PARTITION OF knowledge_base_partitioned
    DEFAULT;

ALTER TABLE kb_archive SET TABLESPACE slow_disk;

4. 非同步嵌入更新

import psycopg2
import redis
import json
from openai import OpenAI

r = redis.Redis()
client = OpenAI()

def enqueue_embedding_update(doc_id: int, text: str):
    r.lpush("embedding_queue", json.dumps({"doc_id": doc_id, "text": text}))

def process_embedding_queue():
    conn = psycopg2.connect("dbname=mydb user=postgres password=password")
    cur = conn.cursor()

    while True:
        _, data = r.brpop("embedding_queue", timeout=30)
        if data is None:
            continue
        task = json.loads(data)
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=task["text"],
        )
        embedding = response.data[0].embedding

        cur.execute(
            "UPDATE knowledge_base SET embedding = %s WHERE id = %s",
            (str(embedding), task["doc_id"]),
        )
        conn.commit()

    cur.close()
    conn.close()

5. 查詢快取

import hashlib
import json
import redis

r = redis.Redis()

def cached_vector_search(query: str, search_fn, ttl: int = 3600):
    cache_key = f"vec_search:{hashlib.md5(query.encode()).hexdigest()}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    results = search_fn(query)
    r.setex(cache_key, ttl, json.dumps(results))
    return results

對比分析:pgvector vs Pinecone vs Milvus vs Qdrant

維度 pgvector Pinecone Milvus Qdrant
部署方式 PG擴充套件 全託管 自託管/託管 自託管/託管
維運成本 極低(複用PG) 高(按查詢計費)
混合查詢 原生SQL 有限過濾 有限過濾 過濾器
事務支援 ACID
最大維度 16000 20000 32768 65535
索引型別 HNSW/IVFFlat 自有 HNSW/IVF/DiskANN HNSW
水平擴展 需Citus 自動 自動 自動
適用規模 百萬級 十億級 十億級 億級
資料一致性 強一致 最終一致 最終一致 最終一致
半精度支援 halfvec 支援 支援 支援
多租戶 RLS/分割區 命名空間 集合 集合
學習曲線 低(SQL) 低(API)
成本(100萬向量) $0(已有PG) ~$70/月 ~$50/月(自託管) ~$40/月(自託管)

選型建議

  • 已有PostgreSQL + 資料量百萬級 → pgvector(零額外成本)
  • 全託管需求 + 不在意成本 → Pinecone
  • 資料量億級 + 需要水平擴展 → Milvus
  • 中等規模 + 需要靈活過濾 → Qdrant

線上工具推薦


相關閱讀


總結

PostgreSQL pgvector RAG在2026年已經成為中小規模RAG系統的首選方案。7種生產模式覆蓋了從表設計、索引最佳化、混合檢索、重排序、多租戶隔離到生產部署的完整鏈路。核心優勢:零額外維運成本、原生SQL混合查詢、ACID事務保證。百萬級資料量下HNSW索引效能優秀,三路混合檢索(向量+全文+關鍵詞)+ RRF融合 + Cross-Encoder重排序的召回率遠超純向量搜尋。只有當資料量達到億級或需要水平擴展時,才需要考慮專門的向量資料庫。


參考資源

本站提供瀏覽器本地工具,免註冊即可試用 →

#PostgreSQL#pgvector#RAG#向量检索#语义搜索#混合检索#2026#数据库