PostgreSQL + pgvector向量搜索:2026年不用向量数据库也能做RAG检索
PostgreSQL + pgvector向量搜索:2026年不用向量数据库也能做RAG检索
你是不是一提到RAG就想到Pinecone、Milvus、Weaviate这些向量数据库?为了一个简单的语义搜索功能就要引入一套全新的数据库系统,运维成本翻倍,数据同步头疼不已?2026年,PostgreSQL的pgvector扩展已经足够成熟——你现有的PostgreSQL就能做向量搜索,不需要额外的向量数据库。
背景知识
向量搜索基础
向量搜索的核心是将文本、图像等非结构化数据转化为高维向量(Embedding),然后通过计算向量间的距离(余弦相似度、欧氏距离等)找到语义最相近的结果。
| 概念 | 说明 | 示例 |
|---|---|---|
| Embedding | 将数据映射为向量 | text-embedding-3-large输出3072维向量 |
| 余弦相似度 | 向量夹角余弦值,范围[-1,1] | 1表示完全相同 |
| 欧氏距离 | 向量间的直线距离 | 0表示完全相同 |
| 内积 | 向量点乘 | 适合归一化向量 |
| ANN搜索 | 近似最近邻,牺牲精度换速度 | HNSW、IVFFlat |
pgvector支持的索引类型
| 索引 | 算法 | 构建速度 | 查询速度 | 召回率 | 内存占用 | 适用场景 |
|---|---|---|---|---|---|---|
| HNSW | 图索引 | 慢 | 快 | 高 | 高 | 中小规模,高召回 |
| IVFFlat | 倒排索引 | 快 | 中 | 中 | 低 | 大规模,可接受精度损失 |
问题分析
为什么不用专门的向量数据库?
- 运维成本:新增一套数据库意味着备份、监控、升级、高可用全都要额外处理
- 数据一致性:业务数据在PostgreSQL,向量数据在向量数据库,双写一致性难以保证
- 查询能力:向量数据库的过滤能力有限,无法做复杂的关联查询
- 迁移风险:向量数据库市场碎片化,Pinecone/Milvus/Weaviate API不兼容
pgvector让你在PostgreSQL内完成向量搜索+业务过滤+关联查询,一站式解决。
分步实操
步骤1:安装pgvector扩展
-- 连接PostgreSQL
psql -U postgres -d mydb
-- 安装扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 验证版本
SELECT extversion FROM pg_extension WHERE extname = 'vector';
-- 0.8.0+
Docker方式:
# docker-compose.yml
services:
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: mydb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
步骤2:创建向量表
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT,
category TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
embedding vector(1536)
);
-- 创建HNSW索引(推荐用于高召回场景)
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 创建IVFFlat索引(推荐用于大规模数据)
-- CREATE INDEX ON documents
-- USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);
步骤3:生成和存储向量
import psycopg2
from openai import OpenAI
client = OpenAI()
conn = psycopg2.connect("dbname=mydb user=postgres password=password")
cur = conn.cursor()
def generate_embedding(text: str) -> list[float]:
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def insert_document(title: str, content: str, source: str, category: str):
full_text = f"{title}\n{content}"
embedding = generate_embedding(full_text)
cur.execute(
"INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
(title, content, source, category, str(embedding)),
)
conn.commit()
insert_document(
"K8s Gateway API指南",
"Gateway API是Kubernetes新一代流量管理标准...",
"toolsku.dev/blog",
"DevOps",
)
步骤4:向量搜索查询
-- 余弦相似度搜索
SELECT id, title, content, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;
-- 带过滤条件的向量搜索
SELECT id, title, 1 - (embedding <=> $1) AS similarity
FROM documents
WHERE category = 'DevOps'
AND 1 - (embedding <=> $1) > 0.7
ORDER BY embedding <=> $1
LIMIT 10;
步骤5:混合查询(向量+全文)
-- 创建全文搜索索引
ALTER TABLE documents ADD COLUMN tsv tsvector
GENERATED ALWAYS AS (to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(content, ''))) STORED;
CREATE INDEX ON documents USING gin(tsv);
-- 混合查询:向量搜索 + 全文搜索 + RRF融合
WITH vector_results AS (
SELECT id, title,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1) AS vector_rank,
1 - (embedding <=> $1) AS vector_score
FROM documents
ORDER BY embedding <=> $1
LIMIT 50
),
text_results AS (
SELECT id, title,
ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', $2)) DESC) AS text_rank,
ts_rank(tsv, plainto_tsquery('simple', $2)) AS text_score
FROM documents
WHERE tsv @@ plainto_tsquery('simple', $2)
LIMIT 50
),
combined AS (
SELECT
COALESCE(v.id, t.id) AS id,
COALESCE(v.title, t.title) AS title,
COALESCE(1.0 / (60 + v.vector_rank), 0) +
COALESCE(1.0 / (60 + t.text_rank), 0) AS rrf_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT id, title, rrf_score
FROM combined
ORDER BY rrf_score DESC
LIMIT 10;
完整代码:RAG检索系统
import psycopg2
from openai import OpenAI
from dataclasses import dataclass
@dataclass
class SearchResult:
id: int
title: str
content: str
score: float
source: str
class PgVectorRAG:
def __init__(self, db_url: str, openai_api_key: str):
self.conn = psycopg2.connect(db_url)
self.client = OpenAI(api_key=openai_api_key)
def embed(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def search(
self,
query: str,
top_k: int = 10,
threshold: float = 0.6,
category: str | None = None,
use_hybrid: bool = True,
) -> list[SearchResult]:
query_embedding = self.embed(query)
embedding_str = str(query_embedding)
if use_hybrid:
return self._hybrid_search(embedding_str, query, top_k, category)
return self._vector_search(embedding_str, top_k, threshold, category)
def _vector_search(
self,
embedding_str: str,
top_k: int,
threshold: float,
category: str | None,
) -> list[SearchResult]:
sql = """
SELECT id, title, content, source,
1 - (embedding <=> %s::vector) AS similarity
FROM documents
WHERE 1 - (embedding <=> %s::vector) > %s
"""
params: list = [embedding_str, embedding_str, threshold]
if category:
sql += " AND category = %s"
params.append(category)
sql += " ORDER BY embedding <=> %s::vector LIMIT %s"
params.extend([embedding_str, top_k])
cur = self.conn.cursor()
cur.execute(sql, params)
results = cur.fetchall()
return [
SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
for r in results
]
def _hybrid_search(
self,
embedding_str: str,
query_text: str,
top_k: int,
category: str | None,
) -> list[SearchResult]:
category_filter = "AND category = %s" if category else ""
params: list = [embedding_str, embedding_str]
if category:
params.append(category)
params.extend([embedding_str, query_text])
if category:
params.append(category)
params.extend([embedding_str, top_k])
sql = f"""
WITH vector_results AS (
SELECT id, title, content, source,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS v_rank
FROM documents
WHERE 1 - (embedding <=> %s::vector) > 0.5
{category_filter}
ORDER BY embedding <=> %s::vector
LIMIT 50
),
text_results AS (
SELECT id, title, content, source,
ROW_NUMBER() OVER (ORDER BY ts_rank(tsv, plainto_tsquery('simple', %s)) DESC) AS t_rank
FROM documents
WHERE tsv @@ plainto_tsquery('simple', %s)
{category_filter}
LIMIT 50
),
combined AS (
SELECT
COALESCE(v.id, t.id) AS id,
COALESCE(v.title, t.title) AS title,
COALESCE(v.content, t.content) AS content,
COALESCE(v.source, t.source) AS source,
COALESCE(1.0 / (60 + v.v_rank), 0) +
COALESCE(1.0 / (60 + t.t_rank), 0) AS rrf_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT id, title, content, source, rrf_score
FROM combined
ORDER BY rrf_score DESC
LIMIT %s
"""
cur = self.conn.cursor()
cur.execute(sql, params)
results = cur.fetchall()
return [
SearchResult(id=r[0], title=r[1], content=r[2], source=r[3], score=r[4])
for r in results
]
def generate_answer(self, query: str, top_k: int = 5) -> str:
results = self.search(query, top_k=top_k, use_hybrid=True)
context = "\n\n".join(f"[{r.title}]\n{r.content}" for r in results)
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "基于以下参考资料回答问题。如果参考资料中没有相关信息,请说明。\n\n" + context},
{"role": "user", "content": query},
],
)
return response.choices[0].message.content
if __name__ == "__main__":
rag = PgVectorRAG(
db_url="dbname=mydb user=postgres password=password",
openai_api_key="sk-xxx",
)
answer = rag.generate_answer("如何配置K8s Gateway API?")
print(answer)
避坑指南
坑1:HNSW索引构建时间过长
现象:百万级数据创建HNSW索引耗时数小时,期间表被锁定。
解决:使用CONCURRENTLY选项(pgvector 0.7+)或在低峰期构建。降低ef_construction参数(默认64→32)可加速构建但略微降低召回率。分批构建后合并。
坑2:向量维度不匹配导致插入失败
现象:ERROR: expected 1536 dimensions, not 768。
解决:确保Embedding模型输出维度与表定义的vector(N)一致。text-embedding-3-small输出1536维,text-embedding-3-large输出3072维。修改表定义:ALTER TABLE documents ALTER COLUMN embedding TYPE vector(768)。
坑3:IVFFlat索引在数据更新后召回率骤降
现象:新增大量数据后,搜索结果明显不准。
解决:IVFFlat的聚类中心在索引创建时确定,新增数据会导致偏移。需要定期重建索引:REINDEX INDEX documents_embedding_idx。或使用HNSW索引(增量友好)。
坑4:混合查询性能差
现象:向量+全文混合查询耗时超过1秒。
解决:确保两个搜索分别走索引,使用EXPLAIN ANALYZE确认。限制各自返回的候选集大小(各50条),RRF融合在少量数据上很快。避免在混合查询中使用SELECT *。
坑5:pgvector占用内存过大
现象:HNSW索引内存占用是原始数据的2-3倍。
解决:调整m参数(默认16→8)降低图连接度,减少内存占用但牺牲一些召回率。使用IVFFlat替代HNSW。考虑使用半精度向量:vector(1536) → halfvec(1536)(pgvector 0.7+)。
报错排查
| 序号 | 报错信息 | 原因 | 解决方法 |
|---|---|---|---|
| 1 | extension "vector" not available |
pgvector未安装 | 使用pgvector/pgvector Docker镜像或编译安装 |
| 2 | expected 1536 dimensions, not 768 |
向量维度不匹配 | 检查Embedding模型,修改表定义的维度 |
| 3 | operator does not exist: vector <=> unknown |
缺少类型转换 | 使用%s::vector显式转换 |
| 4 | index row size exceeds maximum |
HNSW索引行过大 | 降低m参数,或使用halfvec |
| 5 | cannot create index concurrently |
pgvector版本不支持 | 升级到0.7+或使用非并发方式 |
| 6 | IVFFlat recall is low after inserts |
新数据偏离聚类中心 | 定期REINDEX或改用HNSW |
| 7 | out of memory during index build |
构建索引内存不足 | 增加work_mem,分批构建 |
| 8 | invalid input syntax for type vector |
向量格式错误 | 确保使用[0.1, 0.2, ...]格式 |
| 9 | permission denied for schema public |
用户无创建扩展权限 | 使用超级用户执行CREATE EXTENSION |
| 10 | query timeout on hybrid search |
混合查询过慢 | 限制候选集大小,添加索引,检查EXPLAIN |
进阶优化
1. 半精度向量节省存储
-- 使用halfvec(半精度浮点),存储减半
ALTER TABLE documents ALTER COLUMN embedding TYPE halfvec(1536);
-- 创建HNSW索引
CREATE INDEX ON documents
USING hnsw (embedding halfvec_cosine_ops)
WITH (m = 16, ef_construction = 64);
2. 分区表加速大规模搜索
CREATE TABLE documents_partitioned (
LIKE documents INCLUDING DEFAULTS
) PARTITION BY LIST (category);
CREATE TABLE documents_devops PARTITION OF documents_partitioned FOR VALUES IN ('DevOps');
CREATE TABLE documents_frontend PARTITION OF documents_partitioned FOR VALUES IN ('前端工程');
CREATE TABLE documents_ai PARTITION OF documents_partitioned FOR VALUES IN ('AI与大数据');
3. 查询时调整HNSW搜索精度
-- 提高ef_search提升召回率(默认40)
SET hnsw.ef_search = 200;
-- 降低ef_search加速查询
SET hnsw.ef_search = 20;
4. 批量向量生成优化
import psycopg2
from openai import OpenAI
import math
client = OpenAI()
def batch_embed(texts: list[str], batch_size: int = 100) -> list[list[float]]:
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = client.embeddings.create(
model="text-embedding-3-small",
input=batch,
)
embeddings.extend([d.embedding for d in response.data])
return embeddings
def bulk_insert_documents(docs: list[dict]):
texts = [f"{d['title']}\n{d['content']}" for d in docs]
embeddings = batch_embed(texts)
cur = conn.cursor()
for doc, emb in zip(docs, embeddings):
cur.execute(
"INSERT INTO documents (title, content, source, category, embedding) VALUES (%s, %s, %s, %s, %s)",
(doc['title'], doc['content'], doc['source'], doc['category'], str(emb)),
)
conn.commit()
对比分析
| 维度 | pgvector | Pinecone | Milvus | Weaviate | Qdrant |
|---|---|---|---|---|---|
| 部署方式 | PostgreSQL扩展 | 全托管 | 自托管/托管 | 自托管/托管 | 自托管/托管 |
| 运维成本 | 极低(复用PG) | 高(按查询计费) | 高 | 高 | 中 |
| 混合查询 | 原生SQL | 有限过滤 | 有限过滤 | GraphQL过滤 | 过滤器 |
| 事务支持 | ACID | 无 | 无 | 无 | 无 |
| 最大维度 | 16000 | 20000 | 32768 | 65535 | 65535 |
| 索引类型 | HNSW/IVFFlat | 自有 | HNSW/IVF/DiskANN | HNSW | HNSW |
| 水平扩展 | 需Citus | 自动 | 自动 | 自动 | 自动 |
| 适用规模 | 百万级 | 十亿级 | 十亿级 | 亿级 | 亿级 |
| 数据一致性 | 强一致 | 最终一致 | 最终一致 | 最终一致 | 最终一致 |
总结展望
总结:对于大多数RAG应用场景,PostgreSQL + pgvector已经完全够用。它的核心优势是零额外运维成本、原生SQL混合查询、ACID事务保证。百万级数据量下HNSW索引性能优秀,混合查询(向量+全文+RRF)召回率远超纯向量搜索。只有当数据量达到亿级或需要水平扩展时,才需要考虑专门的向量数据库。建议从pgvector起步,按需演进。
在线工具推荐
- JSON格式化(API响应调试):/zh-CN/json/format
- Base64编解码(向量数据处理):/zh-CN/encode/base64
- curl转代码(API测试):/zh-CN/dev/curl-to-code
本站提供浏览器本地工具,免注册即可试用 →