PostgreSQL + pgvector向量搜索:2026年不用向量数据库也能做RAG检索

编程语言

PostgreSQL + pgvector向量搜索:2026年不用向量数据库也能做RAG检索

你是不是一提到RAG就想到Pinecone、Milvus、Weaviate这些向量数据库?为了一个简单的语义搜索功能就要引入一套全新的数据库系统,运维成本翻倍,数据同步头疼不已?2026年,PostgreSQL的pgvector扩展已经足够成熟——你现有的PostgreSQL就能做向量搜索,不需要额外的向量数据库。


背景知识

向量搜索基础

向量搜索的核心是将文本、图像等非结构化数据转化为高维向量(Embedding),然后通过计算向量间的距离(余弦相似度、欧氏距离等)找到语义最相近的结果。

概念 说明 示例
Embedding 将数据映射为向量 text-embedding-3-large输出3072维向量
余弦相似度 向量夹角余弦值,范围[-1,1] 1表示完全相同
欧氏距离 向量间的直线距离 0表示完全相同
内积 向量点乘 适合归一化向量
ANN搜索 近似最近邻,牺牲精度换速度 HNSW、IVFFlat

pgvector支持的索引类型

索引 算法 构建速度 查询速度 召回率 内存占用 适用场景
HNSW 图索引 中小规模,高召回
IVFFlat 倒排索引 大规模,可接受精度损失

问题分析

为什么不用专门的向量数据库?

  1. 运维成本:新增一套数据库意味着备份、监控、升级、高可用全都要额外处理
  2. 数据一致性:业务数据在PostgreSQL,向量数据在向量数据库,双写一致性难以保证
  3. 查询能力:向量数据库的过滤能力有限,无法做复杂的关联查询
  4. 迁移风险:向量数据库市场碎片化,Pinecone/Milvus/Weaviate API不兼容

pgvector让你在PostgreSQL内完成向量搜索+业务过滤+关联查询,一站式解决。


分步实操

步骤1:安装pgvector扩展

-- 连接PostgreSQL
psql -U postgres -d mydb

-- 安装扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 验证版本
SELECT extversion FROM pg_extension WHERE extname = 'vector';
-- 0.8.0+

Docker方式:

# docker-compose.yml
services:
  postgres:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_DB: mydb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

volumes:
  pgdata:

步骤2:创建向量表

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    source TEXT,
    category TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    embedding vector(1536)
);

-- 创建HNSW索引(推荐用于高召回场景)
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 创建IVFFlat索引(推荐用于大规模数据)
-- CREATE INDEX ON documents
-- USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);

步骤3:生成和存储向量

import psycopg2
from openai import OpenAI

client = OpenAI()
conn = psycopg2.connect("dbname=mydb user=postgres password=password")
cur = conn.cursor()

def generate_embedding(text: str) -> list[float]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return response.data[0].embedding

def insert_document(title: str, content: str, source: str, category: str):
    full_text = f"{title}\n{content}"
    embedding = generate_embedding(full_text)

    cur.execute(
        "INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
        (title, content, source, category, str(embedding)),
    )
    conn.commit()

insert_document(
    "K8s Gateway API指南",
    "Gateway API是Kubernetes新一代流量管理标准...",
    "toolsku.dev/blog",
    "DevOps",
)

步骤4:向量搜索查询

-- 余弦相似度搜索
SELECT id, title, content, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;

-- 带过滤条件的向量搜索
SELECT id, title, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE category = 'DevOps'
  AND 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;

步骤5:混合查询(向量+全文)

-- 创建全文搜索索引
ALTER TABLE documents ADD COLUMN tsv tsvector
    GENERATED ALWAYS AS (to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(content, ''))) STORED;

CREATE INDEX ON documents USING gin(tsv);

-- 混合查询:向量搜索 + 全文搜索 + RRF融合
WITH vector_results AS (
    SELECT id, title,
           ROW_NUMBER() OVER (ORDER BY embedding <=> $1) AS vector_rank,
           1 - (embedding <=> $1) AS vector_score
    FROM documents
    ORDER BY embedding <=> $1
    LIMIT 50
),
text_results AS (
    SELECT id, title,
           ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', $2)) DESC) AS text_rank,
           ts_rank(tsv, plainto_tsquery('simple', $2)) AS text_score
    FROM documents
    WHERE tsv @@ plainto_tsquery('simple', $2)
    LIMIT 50
),
combined AS (
    SELECT
        COALESCE(v.id, t.id) AS id,
        COALESCE(v.title, t.title) AS title,
        COALESCE(1.0 / (60 + v.vector_rank), 0) +
        COALESCE(1.0 / (60 + t.text_rank), 0) AS rrf_score
    FROM vector_results v
    FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT id, title, rrf_score
FROM combined
ORDER BY rrf_score DESC
LIMIT 10;

完整代码:RAG检索系统

import psycopg2
from openai import OpenAI
from dataclasses import dataclass

@dataclass
class SearchResult:
    id: int
    title: str
    content: str
    score: float
    source: str

class PgVectorRAG:
    def __init__(self, db_url: str, openai_api_key: str):
        self.conn = psycopg2.connect(db_url)
        self.client = OpenAI(api_key=openai_api_key)

    def embed(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    def search(
        self,
        query: str,
        top_k: int = 10,
        threshold: float = 0.6,
        category: str | None = None,
        use_hybrid: bool = True,
    ) -> list[SearchResult]:
        query_embedding = self.embed(query)
        embedding_str = str(query_embedding)

        if use_hybrid:
            return self._hybrid_search(embedding_str, query, top_k, category)
        return self._vector_search(embedding_str, top_k, threshold, category)

    def _vector_search(
        self,
        embedding_str: str,
        top_k: int,
        threshold: float,
        category: str | None,
    ) -> list[SearchResult]:
        sql = """
            SELECT id, title, content, source,
                   1 - (embedding <=> %s::vector) AS similarity
            FROM documents
            WHERE 1 - (embedding <=> %s::vector) > %s
        """
        params: list = [embedding_str, embedding_str, threshold]

        if category:
            sql += " AND category = %s"
            params.append(category)

        sql += " ORDER BY embedding <=> %s::vector LIMIT %s"
        params.extend([embedding_str, top_k])

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
            for r in results
        ]

    def _hybrid_search(
        self,
        embedding_str: str,
        query_text: str,
        top_k: int,
        category: str | None,
    ) -> list[SearchResult]:
        category_filter = "AND category = %s" if category else ""
        params: list = [embedding_str, embedding_str]

        if category:
            params.append(category)

        params.extend([embedding_str, query_text])

        if category:
            params.append(category)

        params.extend([embedding_str, top_k])

        sql = f"""
        WITH vector_results AS (
            SELECT id, title, content, source,
                   ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
            FROM documents
            WHERE 1 - (embedding <=> %s::vector) > 0.5
            {category_filter}
            ORDER BY embedding <=> %s::vector
            LIMIT 50
        ),
        text_results AS (
            SELECT id, title, content, source,
                   ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', %s)) DESC) AS t_rank
            FROM documents
            WHERE tsv @@ plainto_tsquery('simple', %s)
            {category_filter}
            LIMIT 50
        ),
        combined AS (
            SELECT
                COALESCE(v.id, t.id) AS id,
                COALESCE(v.title, t.title) AS title,
                COALESCE(v.content, t.content) AS content,
                COALESCE(v.source, t.source) AS source,
                COALESCE(1.0 / (60 + v.v_rank), 0) +
                COALESCE(1.0 / (60 + t.t_rank), 0) AS rrf_score
            FROM vector_results v
            FULL OUTER JOIN text_results t ON v.id = t.id
        )
        SELECT id, title, content, source, rrf_score
        FROM combined
        ORDER BY rrf_score DESC
        LIMIT %s
        """

        cur = self.conn.cursor()
        cur.execute(sql, params)
        results = cur.fetchall()

        return [
            SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
            for r in results
        ]

    def generate_answer(self, query: str, top_k: int = 5) -> str:
        results = self.search(query, top_k=top_k, use_hybrid=True)
        context = "\n\n".join(f"[{r.title}]\n{r.content}" for r in results)

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "基于以下参考资料回答问题。如果参考资料中没有相关信息,请说明。\n\n" + context},
                {"role": "user", "content": query},
            ],
        )
        return response.choices[0].message.content


if __name__ == "__main__":
    rag = PgVectorRAG(
        db_url="dbname=mydb user=postgres password=password",
        openai_api_key="sk-xxx",
    )

    answer = rag.generate_answer("如何配置K8s Gateway API?")
    print(answer)

避坑指南

坑1:HNSW索引构建时间过长

现象:百万级数据创建HNSW索引耗时数小时,期间表被锁定。

解决:使用CONCURRENTLY选项(pgvector 0.7+)或在低峰期构建。降低ef_construction参数(默认64→32)可加速构建但略微降低召回率。分批构建后合并。

坑2:向量维度不匹配导致插入失败

现象ERROR: expected 1536 dimensions, not 768

解决:确保Embedding模型输出维度与表定义的vector(N)一致。text-embedding-3-small输出1536维,text-embedding-3-large输出3072维。修改表定义:ALTER TABLE documents ALTER COLUMN embedding TYPE vector(768)

坑3:IVFFlat索引在数据更新后召回率骤降

现象:新增大量数据后,搜索结果明显不准。

解决:IVFFlat的聚类中心在索引创建时确定,新增数据会导致偏移。需要定期重建索引:REINDEX INDEX documents_embedding_idx。或使用HNSW索引(增量友好)。

坑4:混合查询性能差

现象:向量+全文混合查询耗时超过1秒。

解决:确保两个搜索分别走索引,使用EXPLAIN ANALYZE确认。限制各自返回的候选集大小(各50条),RRF融合在少量数据上很快。避免在混合查询中使用SELECT *

坑5:pgvector占用内存过大

现象:HNSW索引内存占用是原始数据的2-3倍。

解决:调整m参数(默认16→8)降低图连接度,减少内存占用但牺牲一些召回率。使用IVFFlat替代HNSW。考虑使用半精度向量:vector(1536)halfvec(1536)(pgvector 0.7+)。


报错排查

序号 报错信息 原因 解决方法
1 extension "vector" not available pgvector未安装 使用pgvector/pgvector Docker镜像或编译安装
2 expected 1536 dimensions, not 768 向量维度不匹配 检查Embedding模型,修改表定义的维度
3 operator does not exist: vector <=> unknown 缺少类型转换 使用%s::vector显式转换
4 index row size exceeds maximum HNSW索引行过大 降低m参数,或使用halfvec
5 cannot create index concurrently pgvector版本不支持 升级到0.7+或使用非并发方式
6 IVFFlat recall is low after inserts 新数据偏离聚类中心 定期REINDEX或改用HNSW
7 out of memory during index build 构建索引内存不足 增加work_mem,分批构建
8 invalid input syntax for type vector 向量格式错误 确保使用[0.1, 0.2, ...]格式
9 permission denied for schema public 用户无创建扩展权限 使用超级用户执行CREATE EXTENSION
10 query timeout on hybrid search 混合查询过慢 限制候选集大小,添加索引,检查EXPLAIN

进阶优化

1. 半精度向量节省存储

-- 使用halfvec(半精度浮点),存储减半
ALTER TABLE documents ALTER COLUMN embedding TYPE halfvec(1536);

-- 创建HNSW索引
CREATE INDEX ON documents
USING hnsw (embedding halfvec_cosine_ops)
WITH (m = 16, ef_construction = 64);

2. 分区表加速大规模搜索

CREATE TABLE documents_partitioned (
    LIKE documents INCLUDING DEFAULTS
) PARTITION BY LIST (category);

CREATE TABLE documents_devops PARTITION OF documents_partitioned FOR VALUES IN ('DevOps');
CREATE TABLE documents_frontend PARTITION OF documents_partitioned FOR VALUES IN ('前端工程');
CREATE TABLE documents_ai PARTITION OF documents_partitioned FOR VALUES IN ('AI与大数据');

3. 查询时调整HNSW搜索精度

-- 提高ef_search提升召回率(默认40)
SET hnsw.ef_search = 200;

-- 降低ef_search加速查询
SET hnsw.ef_search = 20;

4. 批量向量生成优化

import psycopg2
from openai import OpenAI
import math

client = OpenAI()

def batch_embed(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=batch,
        )
        embeddings.extend([d.embedding for d in response.data])
    return embeddings

def bulk_insert_documents(docs: list[dict]):
    texts = [f"{d['title']}\n{d['content']}" for d in docs]
    embeddings = batch_embed(texts)

    cur = conn.cursor()
    for doc, emb in zip(docs, embeddings):
        cur.execute(
            "INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
            (doc['title'], doc['content'], doc['source'], doc['category'], str(emb)),
        )
    conn.commit()

对比分析

维度 pgvector Pinecone Milvus Weaviate Qdrant
部署方式 PostgreSQL扩展 全托管 自托管/托管 自托管/托管 自托管/托管
运维成本 极低(复用PG) 高(按查询计费)
混合查询 原生SQL 有限过滤 有限过滤 GraphQL过滤 过滤器
事务支持 ACID
最大维度 16000 20000 32768 65535 65535
索引类型 HNSW/IVFFlat 自有 HNSW/IVF/DiskANN HNSW HNSW
水平扩展 需Citus 自动 自动 自动 自动
适用规模 百万级 十亿级 十亿级 亿级 亿级
数据一致性 强一致 最终一致 最终一致 最终一致 最终一致

总结展望

总结:对于大多数RAG应用场景,PostgreSQL + pgvector已经完全够用。它的核心优势是零额外运维成本、原生SQL混合查询、ACID事务保证。百万级数据量下HNSW索引性能优秀,混合查询(向量+全文+RRF)召回率远超纯向量搜索。只有当数据量达到亿级或需要水平扩展时,才需要考虑专门的向量数据库。建议从pgvector起步,按需演进。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#PostgreSQL#pgvector#向量搜索#语义检索#RAG#HNSW#IVFFlat#混合查询