TiDB向量搜尋RAG實戰:HTAP架構下語義檢索的5個核心模式
向量資料庫的痛點:為什麼純向量庫撐不住生產RAG?
你的RAG系統上線了,用戶搜「最近三個月的退款政策」,向量檢索返回了一堆語義相近但時間不對的文檔。你想加個時間過濾,卻發現向量資料庫沒有事務,關係查詢和向量查詢是兩套系統,資料同步全靠手動腳本——純向量資料庫在生產RAG中暴露了四大硬傷:
- 無事務保障:向量寫入和業務資料更新不在同一個事務裡,資料一致性靠運氣
- 關係與向量割裂:標量過濾只能做元資料預篩,無法真正融合SQL查詢和語義檢索
- 資料同步複雜:業務庫和向量庫雙寫,CDC管道維護成本高,延遲大
- 成本翻倍:兩套資料庫意味著雙倍存儲、雙倍運維、雙倍故障點
TiDB向量搜尋用HTAP架構一石三鳥:一個資料庫同時承載事務、分析和向量檢索,SQL與向量查詢天然融合。
核心概念速查
| 概念 | 說明 | 典型實現 |
|---|---|---|
| TiDB向量搜尋 | TiDB原生向量列類型與HNSW索引 | TiDB 8.0+ |
| HTAP | 混合事務與分析處理,行存+列存 | TiDB TiFlash |
| 向量索引 | 高維向量的近似最近鄰索引結構 | HNSW、IVF |
| HNSW | 基於分層可導航小世界圖的ANN算法 | ef_construction、max_level |
| 混合查詢 | SQL標量過濾與向量相似度聯合查詢 | WHERE + ORDER BY vec_cosine |
| 語義檢索 | 基於語義嵌入的相似度搜尋 | cosine、L2、inner product |
| 嵌入模型 | 將文本映射為稠密向量的神經網絡 | text-embedding-3-small、bge-large |
| RAG | 檢索增強生成,LLM+外部知識檢索 | LangChain、LlamaIndex |
| 全文檢索 | 基於倒排索引的關鍵詞搜尋 | MATCH AGAINST、FULLTEXT |
| 標量過濾 | 在向量檢索前/後應用條件過濾 | category='tech'、date > '2026-01' |
問題分析:TiDB向量RAG的5大挑戰
| # | 挑戰 | 具體表現 | 影響 |
|---|---|---|---|
| 1 | 向量與關係資料融合 | 向量列與業務列在不同表,JOIN查詢效能差 | 混合查詢延遲高,用戶體驗差 |
| 2 | 查詢效能優化 | HNSW參數調優複雜,大資料集召回率與延遲難平衡 | 檢索慢或召回不足 |
| 3 | 嵌入模型選擇 | 維度、語言、領域適配差異大 | 嵌入品質差導致檢索精度低 |
| 4 | 資料更新與索引維護 | 增量寫入觸發索引重建,大批量更新阻塞在線查詢 | 寫入期間查詢超時 |
| 5 | 成本控制 | 向量列佔用存儲大,HTAP叢集資源消耗高 | 存儲和計算成本超預算 |
這5個挑戰環環相扣:模型選擇決定向量維度和索引參數,索引參數影響查詢效能,資料更新策略依賴索引機制,成本又受所有因素制約。生產級TiDB向量RAG必須系統性地解決這些問題。
分步實操:5個核心模式
模式1:TiDB向量表設計與索引建立
RAG的第一步——設計融合業務欄位與向量列的表結構,建立HNSW索引。
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Index
from sqlalchemy.orm import declarative_base, Session
from tidb_vector.sqlalchemy import VectorType
from datetime import datetime
Base = declarative_base()
class KnowledgeDoc(Base):
__tablename__ = "knowledge_docs"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
category = Column(String(50), index=True)
source = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
embedding = Column(VectorType(1536))
__table_args__ = (
Index("idx_category_created", "category", "created_at"),
)
TIDB_CONN = "mysql+pymysql://root@127.0.0.1:4000/rag_db"
engine = create_engine(TIDB_CONN)
Base.metadata.create_all(engine)
def create_vector_index():
with engine.connect() as conn:
conn.execute("""
ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_embedding_vec
USING HNSW (embedding)
WITH (ef_construction = 128, m = 16)
""")
conn.commit()
create_vector_index()
print("向量表和HNSW索引建立完成")
適用場景:RAG知識庫表設計、多欄位混合查詢需求。
模式2:嵌入生成與批量寫入
將文檔內容通過嵌入模型轉為向量,批量寫入TiDB。
import numpy as np
from openai import OpenAI
from sqlalchemy.orm import Session
class EmbeddingWriter:
def __init__(self, engine, api_key: str,
model: str = "text-embedding-3-small",
dim: int = 1536, batch_size: int = 100):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.model = model
self.dim = dim
self.batch_size = batch_size
def generate_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
input=text, model=self.model, dimensions=self.dim
)
return response.data[0].embedding
def batch_embed(self, texts: list[str]) -> list[list[float]]:
response = self.client.embeddings.create(
input=texts, model=self.model, dimensions=self.dim
)
return [item.embedding for item in response.data]
def write_documents(self, docs: list[dict]) -> int:
written = 0
for i in range(0, len(docs), self.batch_size):
batch = docs[i : i + self.batch_size]
texts = [d["content"] for d in batch]
embeddings = self.batch_embed(texts)
with Session(self.engine) as session:
for doc, emb in zip(batch, embeddings):
record = KnowledgeDoc(
title=doc["title"],
content=doc["content"],
category=doc.get("category", "general"),
source=doc.get("source", ""),
embedding=emb,
)
session.add(record)
session.commit()
written += len(batch)
return written
writer = EmbeddingWriter(engine, api_key="your-api-key")
docs = [
{"title": "退款政策", "content": "2026年Q2退款政策:購買30天內可全額退款...", "category": "policy"},
{"title": "API限流規則", "content": "免費版API限流100次/分鐘,付費版1000次/分鐘...", "category": "tech"},
]
count = writer.write_documents(docs)
print(f"寫入{count}條文檔")
適用場景:知識庫批量匯入、增量文檔嵌入寫入。
模式3:語義檢索與標量過濾混合查詢
TiDB的核心優勢——在一條SQL中同時完成語義檢索和業務條件過濾。
from dataclasses import dataclass
@dataclass
class SearchResult:
id: int
title: str
content: str
category: str
similarity: float
class HybridRetriever:
def __init__(self, engine, api_key: str,
model: str = "text-embedding-3-small"):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.model = model
def search(self, query: str, category: str | None = None,
date_after: str | None = None,
top_k: int = 5) -> list[SearchResult]:
query_emb = self.client.embeddings.create(
input=query, model=self.model
).data[0].embedding
emb_str = str(query_emb)
sql = """
SELECT id, title, content, category,
1 - vec_cosine_distance(embedding, %s) AS similarity
FROM knowledge_docs
WHERE 1=1
"""
params: list = [emb_str]
if category:
sql += " AND category = %s"
params.append(category)
if date_after:
sql += " AND created_at > %s"
params.append(date_after)
sql += " ORDER BY similarity DESC LIMIT %s"
params.append(top_k)
with self.engine.connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [
SearchResult(
id=r[0], title=r[1], content=r[2],
category=r[3], similarity=round(r[4], 4)
)
for r in rows
]
retriever = HybridRetriever(engine, api_key="your-api-key")
results = retriever.search(
query="退款政策", category="policy", date_after="2026-01-01", top_k=3
)
for r in results:
print(f"[{r.similarity:.3f}] {r.title} ({r.category})")
適用場景:帶業務條件的語義搜尋、多維度過濾檢索。
模式4:全文檢索與向量檢索融合
關鍵詞精確匹配+語義理解雙路召回,用RRF算法融合排序。
from dataclasses import dataclass
@dataclass
class FusedResult:
id: int
title: str
content: str
rrf_score: float
vector_rank: int
fulltext_rank: int
class FusedRetriever:
def __init__(self, engine, api_key: str, k: int = 60):
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.k = k
def fused_search(self, query: str, top_k: int = 5) -> list[FusedResult]:
query_emb = self.client.embeddings.create(
input=query, model="text-embedding-3-small"
).data[0].embedding
emb_str = str(query_emb)
vec_sql = """
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY vec_cosine_distance(embedding, %s)) AS vec_rank
FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, %s)
LIMIT 50
"""
ft_sql = """
SELECT id, title, content,
ROW_NUMBER() OVER (ORDER BY MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)) AS ft_rank
FROM knowledge_docs
WHERE MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)
LIMIT 50
"""
with self.engine.connect() as conn:
vec_rows = conn.execute(vec_sql, [emb_str, emb_str]).fetchall()
ft_rows = conn.execute(ft_sql, [query, query]).fetchall()
rrf_scores: dict[int, dict] = {}
for row in vec_rows:
doc_id = row[0]
rrf_scores[doc_id] = {
"title": row[1], "content": row[2],
"vec_rank": row[3], "ft_rank": 9999,
"rrf": 1.0 / (self.k + row[3]),
}
for row in ft_rows:
doc_id = row[0]
if doc_id in rrf_scores:
rrf_scores[doc_id]["ft_rank"] = row[3]
rrf_scores[doc_id]["rrf"] += 1.0 / (self.k + row[3])
else:
rrf_scores[doc_id] = {
"title": row[1], "content": row[2],
"vec_rank": 9999, "ft_rank": row[3],
"rrf": 1.0 / (self.k + row[3]),
}
sorted_results = sorted(
rrf_scores.items(), key=lambda x: x[1]["rrf"], reverse=True
)[:top_k]
return [
FusedResult(
id=doc_id, title=v["title"], content=v["content"],
rrf_score=round(v["rrf"], 5),
vector_rank=v["vec_rank"], fulltext_rank=v["ft_rank"],
)
for doc_id, v in sorted_results
]
fused = FusedRetriever(engine, api_key="your-api-key")
results = fused.fused_search("退款政策 2026", top_k=5)
for r in results:
print(f"[RRF:{r.rrf_score:.4f} V:{r.vector_rank} F:{r.fulltext_rank}] {r.title}")
適用場景:關鍵詞+語義雙路召回、專業術語精確匹配需求。
模式5:端到端RAG應用構建
將檢索、重排、生成串聯為完整RAG管線。
from dataclasses import dataclass
@dataclass
class RAGConfig:
retrieval_top_k: int = 10
rerank_top_n: int = 3
max_context_tokens: int = 3000
llm_model: str = "gpt-4o-mini"
embedding_model: str = "text-embedding-3-small"
class TiDBRAGPipeline:
def __init__(self, engine, api_key: str, config: RAGConfig | None = None):
self.config = config or RAGConfig()
self.engine = engine
self.client = OpenAI(api_key=api_key)
self.retriever = HybridRetriever(
engine, api_key, model=self.config.embedding_model
)
def _rerank(self, query: str, results: list[SearchResult]) -> list[SearchResult]:
scored = []
for r in results:
overlap = sum(
1 for w in query if w in r.content
) / max(len(query), 1)
final_score = 0.7 * r.similarity + 0.3 * overlap
scored.append((r, final_score))
scored.sort(key=lambda x: x[1], reverse=True)
return [r for r, _ in scored[: self.config.rerank_top_n]]
def query(self, question: str, category: str | None = None) -> str:
results = self.retriever.search(
query=question, category=category,
top_k=self.config.retrieval_top_k
)
reranked = self._rerank(question, results)
context = "\n\n".join(
f"【{r.title}】({r.category})\n{r.content}" for r in reranked
)
if len(context) > self.config.max_context_tokens * 4:
context = context[: self.config.max_context_tokens * 4]
prompt = (
"基於以下檢索結果回答問題。如果上下文資訊不足,請明確說明。\n\n"
f"上下文:\n{context}\n\n問題:{question}"
)
response = self.client.chat.completions.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=800,
)
return response.choices[0].message.content
pipeline = TiDBRAGPipeline(engine, api_key="your-api-key")
answer = pipeline.query("2026年Q2的退款政策是什麼?", category="policy")
print(answer)
適用場景:生產級RAG問答系統、企業知識庫智慧助手。
避坑指南:5個常見陷阱
陷阱1:向量列不建索引直接查
❌ 錯誤做法:
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;
✅ 正確做法:
ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_vec USING HNSW (embedding)
WITH (ef_construction = 128, m = 16);
-- 查詢時指定ef_search
SET SESSION tidb_vector_ef_search = 64;
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;
陷阱2:嵌入維度與索引維度不匹配
❌ 錯誤做法:
embedding = Column(VectorType(768)) # 表定義768維
# 但寫入時用了text-embedding-3-small的1536維向量
✅ 正確做法:
EMBEDDING_DIM = 1536
class KnowledgeDoc(Base):
embedding = Column(VectorType(EMBEDDING_DIM))
# 嵌入生成時顯式指定維度
response = client.embeddings.create(
input=text, model="text-embedding-3-small", dimensions=EMBEDDING_DIM
)
陷阱3:大批量寫入不關閉索引
❌ 錯誤做法:
for doc in large_doc_list:
session.add(KnowledgeDoc(embedding=doc["emb"]))
session.commit() # 每條寫入都觸發索引更新
✅ 正確做法:
# 先關閉向量索引,批量寫入後再重建
with engine.connect() as conn:
conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec INVISIBLE")
conn.commit()
bulk_write(large_doc_list)
with engine.connect() as conn:
conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec VISIBLE")
conn.commit()
陷阱4:混合查詢把標量過濾放在向量排序之後
❌ 錯誤做法:
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 100;
-- 然後在應用層過濾category
✅ 正確做法:
SELECT * FROM knowledge_docs
WHERE category = 'policy' AND created_at > '2026-01-01'
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 10;
陷阱5:RAG上下文不做截斷直接餵LLM
❌ 錯誤做法:
context = "\n".join(r.content for r in all_results)
prompt = f"上下文:{context}\n問題:{question}"
✅ 正確做法:
MAX_CTX_TOKENS = 3000
def truncate_context(results: list[SearchResult]) -> str:
parts, total = [], 0
for r in results:
est_tokens = len(r.content) // 4
if total + est_tokens > MAX_CTX_TOKENS:
break
parts.append(f"【{r.title}】\n{r.content}")
total += est_tokens
return "\n\n".join(parts)
報錯排查:10個常見錯誤
| # | 錯誤資訊 | 原因 | 解決方案 |
|---|---|---|---|
| 1 | Vector dimension mismatch: expected 1536 got 768 |
嵌入維度與表定義不一致 | 統一嵌入模型維度或修改VectorType參數 |
| 2 | HNSW index build OOM |
ef_construction過大或資料量超記憶體 | 降低ef_construction至64,分批構建索引 |
| 3 | vec_cosine_distance function not found |
TiDB版本低於8.0或未啟用向量功能 | 升級TiDB至8.0+,確認向量外掛已載入 |
| 4 | FULLTEXT index not found on column content |
未建立全文索引 | ALTER TABLE knowledge_docs ADD FULLTEXT INDEX ft_content(content) |
| 5 | Query timeout during vector search |
HNSW ef_search過大或資料量超預期 | 降低ef_search,添加標量過濾縮小範圍 |
| 6 | Duplicate entry for key 'PRIMARY' |
批量寫入時ID衝突 | 使用auto_increment或顯式指定ID範圍 |
| 7 | Embedding API rate limit exceeded |
嵌入API呼叫頻率超限 | 增加請求間隔,使用batch介面,實施退避重試 |
| 8 | TiDB connection pool exhausted |
併發查詢過多,連接未釋放 | 配置連接池大小,使用with語句確保連接歸還 |
| 9 | Index invisible after bulk load |
批量寫入後忘記恢復索引可見性 | 執行ALTER INDEX idx_vec VISIBLE並驗證 |
| 10 | RRF fusion returns empty results |
向量檢索和全文檢索均無匹配 | 放寬相似度閾值,增加fallback檢索策略 |
進階優化:4個關鍵技巧
1. HNSW參數自適應調優
class HNSWAutoTuner:
def __init__(self, engine):
self.engine = engine
def recommend_params(self, row_count: int,
recall_target: float = 0.95) -> dict:
if row_count < 100_000:
return {"ef_construction": 64, "m": 8, "ef_search": 40}
elif row_count < 1_000_000:
return {"ef_construction": 128, "m": 16, "ef_search": 64}
else:
return {"ef_construction": 256, "m": 24, "ef_search": 128}
def apply(self, table: str, params: dict):
with self.engine.connect() as conn:
conn.execute(
f"ALTER TABLE {table} ADD VECTOR INDEX idx_vec "
f"USING HNSW (embedding) WITH "
f"(ef_construction={params['ef_construction']}, m={params['m']})"
)
conn.execute(
f"SET SESSION tidb_vector_ef_search = {params['ef_search']}"
)
conn.commit()
2. 嵌入快取避免重複計算
import hashlib
import json
class EmbeddingCache:
def __init__(self, engine, ttl_hours: int = 24):
self.engine = engine
self.ttl = ttl_hours
self._local: dict[str, list[float]] = {}
def _cache_key(self, text: str, model: str) -> str:
raw = f"{model}:{text}"
return hashlib.md5(raw.encode()).hexdigest()
def get_or_compute(self, text: str, model: str,
compute_fn) -> list[float]:
key = self._cache_key(text, model)
if key in self._local:
return self._local[key]
emb = compute_fn(text, model)
self._local[key] = emb
return emb
3. 查詢路由:自動選擇檢索策略
class QueryRouter:
def route(self, query: str) -> str:
has_exact_terms = any(
kw in query for kw in ["編號", "ID", "代碼", "版本號"]
)
is_recent = any(
kw in query for kw in ["最新", "最近", "今天", "本週"]
)
if has_exact_terms and is_recent:
return "fused"
elif has_exact_terms:
return "fulltext"
elif is_recent:
return "hybrid"
else:
return "vector"
4. 向量檢索效能監控
class VectorSearchMonitor:
def __init__(self, engine):
self.engine = engine
def get_index_stats(self, table: str) -> dict:
with self.engine.connect() as conn:
count = conn.execute(
f"SELECT COUNT(*) FROM {table}"
).scalar()
null_embs = conn.execute(
f"SELECT COUNT(*) FROM {table} WHERE embedding IS NULL"
).scalar()
return {
"total_rows": count,
"null_embeddings": null_embs,
"coverage": round((count - null_embs) / count, 4) if count else 0,
}
對比分析:4種向量檢索方案全面對比
| 維度 | TiDB Vector | Pinecone | Milvus | pgvector |
|---|---|---|---|---|
| SQL+向量融合 | 原生支援 | 不支援 | 標量過濾 | 原生支援 |
| 事務ACID | 完整ACID | 無 | 有限 | 完整ACID |
| HTAP分析 | 行存+列存 | 無 | 無 | 無 |
| 水平擴展 | 自動擴縮 | 自動 | 手動分片 | 需Citus |
| 索引類型 | HNSW | 自動 | IVF/HNSW/DiskANN | HNSW/IVFFlat |
| 維度上限 | 16384 | 2048 | 32768 | 2000(HNSW) |
| 全文檢索 | 原生FULLTEXT | 無 | 無 | tsvector |
| 部署方式 | 自託管/Serverless | 僅雲 | 自託管/雲 | 自託管 |
| 運維複雜度 | 低(託管) | 極低 | 高 | 中 |
| 適用場景 | RAG+業務融合 | 純向量SaaS | 大規模向量 | PG生態 |
TiDB Vector的核心優勢:一個資料庫搞定SQL+向量+全文+事務,無需資料同步管道。
總結展望
TiDB向量搜尋正在成為2026年RAG生產部署的關鍵基礎設施:
- Serverless向量:TiDB Cloud Serverless按向量查詢量計費,零運維啟動RAG
- 多模態向量:圖像、音訊嵌入與文本嵌入統一存儲,跨模態檢索
- 串流嵌入更新:CDC驅動的自動嵌入重算,資料變更即時反映到向量索引
- 自適應索引:根據查詢模式自動調整HNSW參數,無需手動調優
- RAG評測標準化:TiDB內建檢索品質指標,端到端RAG評測開箱即用
選擇TiDB向量RAG的原則:如果你的RAG需要SQL過濾、事務保障或業務資料融合,TiDB是唯一的一站式選擇。純向量檢索場景可考慮Pinecone,超大規模向量庫可考慮Milvus。起步建議用TiDB Cloud Serverless免費額度驗證效果。
線上工具推薦
本站提供瀏覽器本地工具,免註冊即可試用 →