TiDB向量搜索RAG实战:HTAP架构下语义检索的5个核心模式
向量数据库的痛点:为什么纯向量库撑不住生产RAG?
你的RAG系统上线了,用户搜"最近三个月的退款政策",向量检索返回了一堆语义相近但时间不对的文档。你想加个时间过滤,却发现向量数据库没有事务,关系查询和向量查询是两套系统,数据同步全靠手动脚本——纯向量数据库在生产RAG中暴露了四大硬伤:
- 无事务保障:向量写入和业务数据更新不在同一个事务里,数据一致性靠运气
- 关系与向量割裂:标量过滤只能做元数据预筛,无法真正融合SQL查询和语义检索
- 数据同步复杂:业务库和向量库双写,CDC管道维护成本高,延迟大
- 成本翻倍:两套数据库意味着双倍存储、双倍运维、双倍故障点
TiDB向量搜索用HTAP架构一石三鸟:一个数据库同时承载事务、分析和向量检索,SQL与向量查询天然融合。
核心概念速查
| 概念 | 说明 | 典型实现 |
|---|---|---|
| TiDB向量搜索 | TiDB原生向量列类型与HNSW索引 | TiDB 8.0+ |
| HTAP | 混合事务与分析处理,行存+列存 | TiDB TiFlash |
| 向量索引 | 高维向量的近似最近邻索引结构 | HNSW、IVF |
| HNSW | 基于分层可导航小世界图的ANN算法 | ef_construction、max_level |
| 混合查询 | SQL标量过滤与向量相似度联合查询 | WHERE + ORDER BY vec_cosine |
| 语义检索 | 基于语义嵌入的相似度搜索 | cosine、L2、inner product |
| 嵌入模型 | 将文本映射为稠密向量的神经网络 | text-embedding-3-small、bge-large |
| RAG | 检索增强生成,LLM+外部知识检索 | LangChain、LlamaIndex |
| 全文检索 | 基于倒排索引的关键词搜索 | MATCH AGAINST、FULLTEXT |
| 标量过滤 | 在向量检索前/后应用条件过滤 | category='tech'、date > '2026-01' |
问题分析:TiDB向量RAG的5大挑战
| # | 挑战 | 具体表现 | 影响 |
|---|---|---|---|
| 1 | 向量与关系数据融合 | 向量列与业务列在不同表,JOIN查询性能差 | 混合查询延迟高,用户体验差 |
| 2 | 查询性能优化 | HNSW参数调优复杂,大数据集召回率与延迟难平衡 | 检索慢或召回不足 |
| 3 | 嵌入模型选择 | 维度、语言、领域适配差异大 | 嵌入质量差导致检索精度低 |
| 4 | 数据更新与索引维护 | 增量写入触发索引重建,大批量更新阻塞在线查询 | 写入期间查询超时 |
| 5 | 成本控制 | 向量列占用存储大,HTAP集群资源消耗高 | 存储和计算成本超预算 |
这5个挑战环环相扣:模型选择决定向量维度和索引参数,索引参数影响查询性能,数据更新策略依赖索引机制,成本又受所有因素制约。生产级TiDB向量RAG必须系统性地解决这些问题。
分步实操:5个核心模式
模式1:TiDB向量表设计与索引创建
RAG的第一步——设计融合业务字段与向量列的表结构,创建HNSW索引。
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Index
from sqlalchemy.orm import declarative_base, Session
from tidb_vector.sqlalchemy import VectorType
from datetime import datetime
Base = declarative_base()
class KnowledgeDoc(Base):
__tablename__ = "knowledge_docs"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
category = Column(String(50), index=True)
source = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
embedding = Column(VectorType(1536))
__table_args__ = (
Index("idx_category_created", "category", "created_at"),
)
TIDB_CONN = "mysql+pymysql://root@127.0.0.1:4000/rag_db"
engine = create_engine(TIDB_CONN)
Base.metadata.create_all(engine)
def create_vector_index():
with engine.connect() as conn:
conn.execute("""
ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_embedding_vec
USING HNSW (embedding)
WITH (ef_construction = 128, m = 16)
""")
conn.commit()
create_vector_index()
print("向量表和HNSW索引创建完成")
适用场景:RAG知识库表设计、多字段混合查询需求。
模式2:嵌入生成与批量写入
将文档内容通过嵌入模型转为向量,批量写入TiDB。
import numpy as np
from openai import OpenAI
from sqlalchemy.orm import Session
class EmbeddingWriter:
def __init__(self, engine, api_key: str,
model: str = "text-embedding-3-small",
dim: int = 1536, batch_size: int = 100):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.model = model
self.dim = dim
self.batch_size = batch_size
def generate_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
input=text, model=self.model, dimensions=self.dim
)
return response.data[0].embedding
def batch_embed(self, texts: list[str]) -> list[list[float]]:
response = self.client.embeddings.create(
input=texts, model=self.model, dimensions=self.dim
)
return [item.embedding for item in response.data]
def write_documents(self, docs: list[dict]) -> int:
written = 0
for i in range(0, len(docs), self.batch_size):
batch = docs[i : i + self.batch_size]
texts = [d["content"] for d in batch]
embeddings = self.batch_embed(texts)
with Session(self.engine) as session:
for doc, emb in zip(batch, embeddings):
record = KnowledgeDoc(
title=doc["title"],
content=doc["content"],
category=doc.get("category", "general"),
source=doc.get("source", ""),
embedding=emb,
)
session.add(record)
session.commit()
written += len(batch)
return written
writer = EmbeddingWriter(engine, api_key="your-api-key")
docs = [
{"title": "退款政策", "content": "2026年Q2退款政策:购买30天内可全额退款...", "category": "policy"},
{"title": "API限流规则", "content": "免费版API限流100次/分钟,付费版1000次/分钟...", "category": "tech"},
]
count = writer.write_documents(docs)
print(f"写入{count}条文档")
适用场景:知识库批量导入、增量文档嵌入写入。
模式3:语义检索与标量过滤混合查询
TiDB的核心优势——在一条SQL中同时完成语义检索和业务条件过滤。
from dataclasses import dataclass
@dataclass
class SearchResult:
id: int
title: str
content: str
category: str
similarity: float
class HybridRetriever:
def __init__(self, engine, api_key: str,
model: str = "text-embedding-3-small"):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.model = model
def search(self, query: str, category: str | None = None,
date_after: str | None = None,
top_k: int = 5) -> list[SearchResult]:
query_emb = self.client.embeddings.create(
input=query, model=self.model
).data[0].embedding
emb_str = str(query_emb)
sql = """
SELECT id, title, content, category,
1 - vec_cosine_distance(embedding, %s) AS similarity
FROM knowledge_docs
WHERE 1=1
"""
params: list = [emb_str]
if category:
sql += " AND category = %s"
params.append(category)
if date_after:
sql += " AND created_at > %s"
params.append(date_after)
sql += " ORDER BY similarity DESC LIMIT %s"
params.append(top_k)
with self.engine.connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [
SearchResult(
id=r[0], title=r[1], content=r[2],
category=r[3], similarity=round(r[4], 4)
)
for r in rows
]
retriever = HybridRetriever(engine, api_key="your-api-key")
results = retriever.search(
query="退款政策", category="policy", date_after="2026-01-01", top_k=3
)
for r in results:
print(f"[{r.similarity:.3f}] {r.title} ({r.category})")
适用场景:带业务条件的语义搜索、多维度过滤检索。
模式4:全文检索与向量检索融合
关键词精确匹配+语义理解双路召回,用RRF算法融合排序。
from dataclasses import dataclass
@dataclass
class FusedResult:
id: int
title: str
content: str
rrf_score: float
vector_rank: int
fulltext_rank: int
class FusedRetriever:
def __init__(self, engine, api_key: str, k: int = 60):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.k = k
def fused_search(self, query: str, top_k: int = 5) -> list[FusedResult]:
query_emb = self.client.embeddings.create(
input=query, model="text-embedding-3-small"
).data[0].embedding
emb_str = str(query_emb)
vec_sql = """
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY vec_cosine_distance(embedding, %s)) AS vec_rank
FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, %s)
LIMIT 50
"""
ft_sql = """
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)) AS ft_rank
FROM knowledge_docs
WHERE MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)
LIMIT 50
"""
with self.engine.connect() as conn:
vec_rows = conn.execute(vec_sql, [emb_str, emb_str]).fetchall()
ft_rows = conn.execute(ft_sql, [query, query]).fetchall()
rrf_scores: dict[int, dict] = {}
for row in vec_rows:
doc_id = row[0]
rrf_scores[doc_id] = {
"title": row[1], "content": row[2],
"vec_rank": row[3], "ft_rank": 9999,
"rrf": 1.0 / (self.k + row[3]),
}
for row in ft_rows:
doc_id = row[0]
if doc_id in rrf_scores:
rrf_scores[doc_id]["ft_rank"] = row[3]
rrf_scores[doc_id]["rrf"] += 1.0 / (self.k + row[3])
else:
rrf_scores[doc_id] = {
"title": row[1], "content": row[2],
"vec_rank": 9999, "ft_rank": row[3],
"rrf": 1.0 / (self.k + row[3]),
}
sorted_results = sorted(
rrf_scores.items(), key=lambda x: x[1]["rrf"], reverse=True
)[:top_k]
return [
FusedResult(
id=doc_id, title=v["title"], content=v["content"],
rrf_score=round(v["rrf"], 5),
vector_rank=v["vec_rank"], fulltext_rank=v["ft_rank"],
)
for doc_id, v in sorted_results
]
fused = FusedRetriever(engine, api_key="your-api-key")
results = fused.fused_search("退款政策 2026", top_k=5)
for r in results:
print(f"[RRF:{r.rrf_score:.4f} V:{r.vector_rank} F:{r.fulltext_rank}] {r.title}")
适用场景:关键词+语义双路召回、专业术语精确匹配需求。
模式5:端到端RAG应用构建
将检索、重排、生成串联为完整RAG管线。
from dataclasses import dataclass
@dataclass
class RAGConfig:
retrieval_top_k: int = 10
rerank_top_n: int = 3
max_context_tokens: int = 3000
llm_model: str = "gpt-4o-mini"
embedding_model: str = "text-embedding-3-small"
class TiDBRAGPipeline:
def __init__(self, engine, api_key: str, config: RAGConfig | None = None):
self.config = config or RAGConfig()
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.retriever = HybridRetriever(
engine, api_key, model=self.config.embedding_model
)
def _rerank(self, query: str, results: list[SearchResult]) -> list[SearchResult]:
scored = []
for r in results:
overlap = sum(
1 for w in query if w in r.content
) / max(len(query), 1)
final_score = 0.7 * r.similarity + 0.3 * overlap
scored.append((r, final_score))
scored.sort(key=lambda x: x[1], reverse=True)
return [r for r, _ in scored[: self.config.rerank_top_n]]
def query(self, question: str, category: str | None = None) -> str:
results = self.retriever.search(
query=question, category=category,
top_k=self.config.retrieval_top_k
)
reranked = self._rerank(question, results)
context = "\n\n".join(
f"【{r.title}】({r.category})\n{r.content}" for r in reranked
)
if len(context) > self.config.max_context_tokens * 4:
context = context[: self.config.max_context_tokens * 4]
prompt = (
"基于以下检索结果回答问题。如果上下文信息不足,请明确说明。\n\n"
f"上下文:\n{context}\n\n问题:{question}"
)
response = self.client.chat.completions.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=800,
)
return response.choices[0].message.content
pipeline = TiDBRAGPipeline(engine, api_key="your-api-key")
answer = pipeline.query("2026年Q2的退款政策是什么?", category="policy")
print(answer)
适用场景:生产级RAG问答系统、企业知识库智能助手。
避坑指南:5个常见陷阱
陷阱1:向量列不建索引直接查
❌ 错误做法:
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;
✅ 正确做法:
ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_vec USING HNSW (embedding)
WITH (ef_construction = 128, m = 16);
-- 查询时指定ef_search
SET SESSION tidb_vector_ef_search = 64;
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;
陷阱2:嵌入维度与索引维度不匹配
❌ 错误做法:
embedding = Column(VectorType(768)) # 表定义768维
# 但写入时用了text-embedding-3-small的1536维向量
✅ 正确做法:
EMBEDDING_DIM = 1536
class KnowledgeDoc(Base):
embedding = Column(VectorType(EMBEDDING_DIM))
# 嵌入生成时显式指定维度
response = client.embeddings.create(
input=text, model="text-embedding-3-small", dimensions=EMBEDDING_DIM
)
陷阱3:大批量写入不关闭索引
❌ 错误做法:
for doc in large_doc_list:
session.add(KnowledgeDoc(embedding=doc["emb"]))
session.commit() # 每条写入都触发索引更新
✅ 正确做法:
# 先关闭向量索引,批量写入后再重建
with engine.connect() as conn:
conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec INVISIBLE")
conn.commit()
bulk_write(large_doc_list)
with engine.connect() as conn:
conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec VISIBLE")
conn.commit()
陷阱4:混合查询把标量过滤放在向量排序之后
❌ 错误做法:
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 100;
-- 然后在应用层过滤category
✅ 正确做法:
SELECT * FROM knowledge_docs
WHERE category = 'policy' AND created_at > '2026-01-01'
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 10;
陷阱5:RAG上下文不做截断直接喂LLM
❌ 错误做法:
context = "\n".join(r.content for r in all_results)
prompt = f"上下文:{context}\n问题:{question}"
✅ 正确做法:
MAX_CTX_TOKENS = 3000
def truncate_context(results: list[SearchResult]) -> str:
parts, total = [], 0
for r in results:
est_tokens = len(r.content) // 4
if total + est_tokens > MAX_CTX_TOKENS:
break
parts.append(f"【{r.title}】\n{r.content}")
total += est_tokens
return "\n\n".join(parts)
报错排查:10个常见错误
| # | 错误信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | Vector dimension mismatch: expected 1536 got 768 |
嵌入维度与表定义不一致 | 统一嵌入模型维度或修改VectorType参数 |
| 2 | HNSW index build OOM |
ef_construction过大或数据量超内存 | 降低ef_construction至64,分批构建索引 |
| 3 | vec_cosine_distance function not found |
TiDB版本低于8.0或未启用向量功能 | 升级TiDB至8.0+,确认向量插件已加载 |
| 4 | FULLTEXT index not found on column content |
未创建全文索引 | ALTER TABLE knowledge_docs ADD FULLTEXT INDEX ft_content(content) |
| 5 | Query timeout during vector search |
HNSW ef_search过大或数据量超预期 | 降低ef_search,添加标量过滤缩小范围 |
| 6 | Duplicate entry for key 'PRIMARY' |
批量写入时ID冲突 | 使用auto_increment或显式指定ID范围 |
| 7 | Embedding API rate limit exceeded |
嵌入API调用频率超限 | 增加请求间隔,使用batch接口,实施退避重试 |
| 8 | TiDB connection pool exhausted |
并发查询过多,连接未释放 | 配置连接池大小,使用with语句确保连接归还 |
| 9 | Index invisible after bulk load |
批量写入后忘记恢复索引可见性 | 执行ALTER INDEX idx_vec VISIBLE并验证 |
| 10 | RRF fusion returns empty results |
向量检索和全文检索均无匹配 | 放宽相似度阈值,增加fallback检索策略 |
进阶优化:4个关键技巧
1. HNSW参数自适应调优
class HNSWAutoTuner:
def __init__(self, engine):
self.engine = engine
def recommend_params(self, row_count: int,
recall_target: float = 0.95) -> dict:
if row_count < 100_000:
return {"ef_construction": 64, "m": 8, "ef_search": 40}
elif row_count < 1_000_000:
return {"ef_construction": 128, "m": 16, "ef_search": 64}
else:
return {"ef_construction": 256, "m": 24, "ef_search": 128}
def apply(self, table: str, params: dict):
with self.engine.connect() as conn:
conn.execute(
f"ALTER TABLE {table} ADD VECTOR INDEX idx_vec "
f"USING HNSW (embedding) WITH "
f"(ef_construction={params['ef_construction']}, m={params['m']})"
)
conn.execute(
f"SET SESSION tidb_vector_ef_search = {params['ef_search']}"
)
conn.commit()
2. 嵌入缓存避免重复计算
import hashlib
import json
class EmbeddingCache:
def __init__(self, engine, ttl_hours: int = 24):
self.engine = engine
self.ttl = ttl_hours
self._local: dict[str, list[float]] = {}
def _cache_key(self, text: str, model: str) -> str:
raw = f"{model}:{text}"
return hashlib.md5(raw.encode()).hexdigest()
def get_or_compute(self, text: str, model: str,
compute_fn) -> list[float]:
key = self._cache_key(text, model)
if key in self._local:
return self._local[key]
emb = compute_fn(text, model)
self._local[key] = emb
return emb
3. 查询路由:自动选择检索策略
class QueryRouter:
def route(self, query: str) -> str:
has_exact_terms = any(
kw in query for kw in ["编号", "ID", "代码", "版本号"]
)
is_recent = any(
kw in query for kw in ["最新", "最近", "今天", "本周"]
)
if has_exact_terms and is_recent:
return "fused"
elif has_exact_terms:
return "fulltext"
elif is_recent:
return "hybrid"
else:
return "vector"
4. 向量检索性能监控
class VectorSearchMonitor:
def __init__(self, engine):
self.engine = engine
def get_index_stats(self, table: str) -> dict:
with self.engine.connect() as conn:
count = conn.execute(
f"SELECT COUNT(*) FROM {table}"
).scalar()
null_embs = conn.execute(
f"SELECT COUNT(*) FROM {table} WHERE embedding IS NULL"
).scalar()
return {
"total_rows": count,
"null_embeddings": null_embs,
"coverage": round((count - null_embs) / count, 4) if count else 0,
}
对比分析:4种向量检索方案全面对比
| 维度 | TiDB Vector | Pinecone | Milvus | pgvector |
|---|---|---|---|---|
| SQL+向量融合 | 原生支持 | 不支持 | 标量过滤 | 原生支持 |
| 事务ACID | 完整ACID | 无 | 有限 | 完整ACID |
| HTAP分析 | 行存+列存 | 无 | 无 | 无 |
| 水平扩展 | 自动扩缩 | 自动 | 手动分片 | 需Citus |
| 索引类型 | HNSW | 自动 | IVF/HNSW/DiskANN | HNSW/IVFFlat |
| 维度上限 | 16384 | 2048 | 32768 | 2000(HNSW) |
| 全文检索 | 原生FULLTEXT | 无 | 无 | tsvector |
| 部署方式 | 自托管/Serverless | 仅云 | 自托管/云 | 自托管 |
| 运维复杂度 | 低(托管) | 极低 | 高 | 中 |
| 适用场景 | RAG+业务融合 | 纯向量SaaS | 大规模向量 | PG生态 |
TiDB Vector的核心优势:一个数据库搞定SQL+向量+全文+事务,无需数据同步管道。
总结展望
TiDB向量搜索正在成为2026年RAG生产部署的关键基础设施:
- Serverless向量:TiDB Cloud Serverless按向量查询量计费,零运维启动RAG
- 多模态向量:图像、音频嵌入与文本嵌入统一存储,跨模态检索
- 流式嵌入更新:CDC驱动的自动嵌入重算,数据变更实时反映到向量索引
- 自适应索引:根据查询模式自动调整HNSW参数,无需手动调优
- RAG评测标准化:TiDB内置检索质量指标,端到端RAG评测开箱即用
选择TiDB向量RAG的原则:如果你的RAG需要SQL过滤、事务保障或业务数据融合,TiDB是唯一的一站式选择。纯向量检索场景可考虑Pinecone,超大规模向量库可考虑Milvus。起步建议用TiDB Cloud Serverless免费额度验证效果。
在线工具推荐
本站提供浏览器本地工具,免注册即可试用 →