TiDB向量搜尋RAG實戰:HTAP架構下語義檢索的5個核心模式

数据库

向量資料庫的痛點:為什麼純向量庫撐不住生產RAG?

你的RAG系統上線了,用戶搜「最近三個月的退款政策」,向量檢索返回了一堆語義相近但時間不對的文檔。你想加個時間過濾,卻發現向量資料庫沒有事務,關係查詢和向量查詢是兩套系統,資料同步全靠手動腳本——純向量資料庫在生產RAG中暴露了四大硬傷

  1. 無事務保障:向量寫入和業務資料更新不在同一個事務裡,資料一致性靠運氣
  2. 關係與向量割裂:標量過濾只能做元資料預篩,無法真正融合SQL查詢和語義檢索
  3. 資料同步複雜:業務庫和向量庫雙寫,CDC管道維護成本高,延遲大
  4. 成本翻倍:兩套資料庫意味著雙倍存儲、雙倍運維、雙倍故障點

TiDB向量搜尋用HTAP架構一石三鳥:一個資料庫同時承載事務、分析和向量檢索,SQL與向量查詢天然融合。


核心概念速查

概念 說明 典型實現
TiDB向量搜尋 TiDB原生向量列類型與HNSW索引 TiDB 8.0+
HTAP 混合事務與分析處理,行存+列存 TiDB TiFlash
向量索引 高維向量的近似最近鄰索引結構 HNSW、IVF
HNSW 基於分層可導航小世界圖的ANN算法 ef_construction、max_level
混合查詢 SQL標量過濾與向量相似度聯合查詢 WHERE + ORDER BY vec_cosine
語義檢索 基於語義嵌入的相似度搜尋 cosine、L2、inner product
嵌入模型 將文本映射為稠密向量的神經網絡 text-embedding-3-small、bge-large
RAG 檢索增強生成,LLM+外部知識檢索 LangChain、LlamaIndex
全文檢索 基於倒排索引的關鍵詞搜尋 MATCH AGAINST、FULLTEXT
標量過濾 在向量檢索前/後應用條件過濾 category='tech'、date > '2026-01'

問題分析:TiDB向量RAG的5大挑戰

# 挑戰 具體表現 影響
1 向量與關係資料融合 向量列與業務列在不同表,JOIN查詢效能差 混合查詢延遲高,用戶體驗差
2 查詢效能優化 HNSW參數調優複雜,大資料集召回率與延遲難平衡 檢索慢或召回不足
3 嵌入模型選擇 維度、語言、領域適配差異大 嵌入品質差導致檢索精度低
4 資料更新與索引維護 增量寫入觸發索引重建,大批量更新阻塞在線查詢 寫入期間查詢超時
5 成本控制 向量列佔用存儲大,HTAP叢集資源消耗高 存儲和計算成本超預算

這5個挑戰環環相扣:模型選擇決定向量維度和索引參數,索引參數影響查詢效能,資料更新策略依賴索引機制,成本又受所有因素制約。生產級TiDB向量RAG必須系統性地解決這些問題。


分步實操:5個核心模式

模式1:TiDB向量表設計與索引建立

RAG的第一步——設計融合業務欄位與向量列的表結構,建立HNSW索引。

from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Index
from sqlalchemy.orm import declarative_base, Session
from tidb_vector.sqlalchemy import VectorType
from datetime import datetime

Base = declarative_base()


class KnowledgeDoc(Base):
    __tablename__ = "knowledge_docs"

    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    category = Column(String(50), index=True)
    source = Column(String(100))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    embedding = Column(VectorType(1536))

    __table_args__ = (
        Index("idx_category_created", "category", "created_at"),
    )


TIDB_CONN = "mysql+pymysql://root@127.0.0.1:4000/rag_db"
engine = create_engine(TIDB_CONN)
Base.metadata.create_all(engine)


def create_vector_index():
    with engine.connect() as conn:
        conn.execute("""
            ALTER TABLE knowledge_docs
            ADD VECTOR INDEX idx_embedding_vec
            USING HNSW (embedding)
            WITH (ef_construction = 128, m = 16)
        """)
        conn.commit()


create_vector_index()
print("向量表和HNSW索引建立完成")

適用場景:RAG知識庫表設計、多欄位混合查詢需求。


模式2:嵌入生成與批量寫入

將文檔內容通過嵌入模型轉為向量,批量寫入TiDB。

import numpy as np
from openai import OpenAI
from sqlalchemy.orm import Session


class EmbeddingWriter:
    def __init__(self, engine, api_key: str,
                 model: str = "text-embedding-3-small",
                 dim: int = 1536, batch_size: int = 100):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.dim = dim
        self.batch_size = batch_size

    def generate_embedding(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            input=text, model=self.model, dimensions=self.dim
        )
        return response.data[0].embedding

    def batch_embed(self, texts: list[str]) -> list[list[float]]:
        response = self.client.embeddings.create(
            input=texts, model=self.model, dimensions=self.dim
        )
        return [item.embedding for item in response.data]

    def write_documents(self, docs: list[dict]) -> int:
        written = 0
        for i in range(0, len(docs), self.batch_size):
            batch = docs[i : i + self.batch_size]
            texts = [d["content"] for d in batch]
            embeddings = self.batch_embed(texts)
            with Session(self.engine) as session:
                for doc, emb in zip(batch, embeddings):
                    record = KnowledgeDoc(
                        title=doc["title"],
                        content=doc["content"],
                        category=doc.get("category", "general"),
                        source=doc.get("source", ""),
                        embedding=emb,
                    )
                    session.add(record)
                session.commit()
            written += len(batch)
        return written


writer = EmbeddingWriter(engine, api_key="your-api-key")
docs = [
    {"title": "退款政策", "content": "2026年Q2退款政策:購買30天內可全額退款...", "category": "policy"},
    {"title": "API限流規則", "content": "免費版API限流100次/分鐘,付費版1000次/分鐘...", "category": "tech"},
]
count = writer.write_documents(docs)
print(f"寫入{count}條文檔")

適用場景:知識庫批量匯入、增量文檔嵌入寫入。


模式3:語義檢索與標量過濾混合查詢

TiDB的核心優勢——在一條SQL中同時完成語義檢索和業務條件過濾。

from dataclasses import dataclass


@dataclass
class SearchResult:
    id: int
    title: str
    content: str
    category: str
    similarity: float


class HybridRetriever:
    def __init__(self, engine, api_key: str,
                 model: str = "text-embedding-3-small"):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def search(self, query: str, category: str | None = None,
               date_after: str | None = None,
               top_k: int = 5) -> list[SearchResult]:
        query_emb = self.client.embeddings.create(
            input=query, model=self.model
        ).data[0].embedding

        emb_str = str(query_emb)
        sql = """
            SELECT id, title, content, category,
                   1 - vec_cosine_distance(embedding, %s) AS similarity
            FROM knowledge_docs
            WHERE 1=1
        """
        params: list = [emb_str]

        if category:
            sql += " AND category = %s"
            params.append(category)
        if date_after:
            sql += " AND created_at > %s"
            params.append(date_after)

        sql += " ORDER BY similarity DESC LIMIT %s"
        params.append(top_k)

        with self.engine.connect() as conn:
            rows = conn.execute(sql, params).fetchall()

        return [
            SearchResult(
                id=r[0], title=r[1], content=r[2],
                category=r[3], similarity=round(r[4], 4)
            )
            for r in rows
        ]


retriever = HybridRetriever(engine, api_key="your-api-key")
results = retriever.search(
    query="退款政策", category="policy", date_after="2026-01-01", top_k=3
)
for r in results:
    print(f"[{r.similarity:.3f}] {r.title} ({r.category})")

適用場景:帶業務條件的語義搜尋、多維度過濾檢索。


模式4:全文檢索與向量檢索融合

關鍵詞精確匹配+語義理解雙路召回,用RRF算法融合排序。

from dataclasses import dataclass


@dataclass
class FusedResult:
    id: int
    title: str
    content: str
    rrf_score: float
    vector_rank: int
    fulltext_rank: int


class FusedRetriever:
    def __init__(self, engine, api_key: str, k: int = 60):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.k = k

    def fused_search(self, query: str, top_k: int = 5) -> list[FusedResult]:
        query_emb = self.client.embeddings.create(
            input=query, model="text-embedding-3-small"
        ).data[0].embedding
        emb_str = str(query_emb)

        vec_sql = """
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY vec_cosine_distance(embedding, %s)) AS vec_rank
            FROM knowledge_docs
            ORDER BY vec_cosine_distance(embedding, %s)
            LIMIT 50
        """
        ft_sql = """
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)) AS ft_rank
            FROM knowledge_docs
            WHERE MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)
            LIMIT 50
        """

        with self.engine.connect() as conn:
            vec_rows = conn.execute(vec_sql, [emb_str, emb_str]).fetchall()
            ft_rows = conn.execute(ft_sql, [query, query]).fetchall()

        rrf_scores: dict[int, dict] = {}
        for row in vec_rows:
            doc_id = row[0]
            rrf_scores[doc_id] = {
                "title": row[1], "content": row[2],
                "vec_rank": row[3], "ft_rank": 9999,
                "rrf": 1.0 / (self.k + row[3]),
            }
        for row in ft_rows:
            doc_id = row[0]
            if doc_id in rrf_scores:
                rrf_scores[doc_id]["ft_rank"] = row[3]
                rrf_scores[doc_id]["rrf"] += 1.0 / (self.k + row[3])
            else:
                rrf_scores[doc_id] = {
                    "title": row[1], "content": row[2],
                    "vec_rank": 9999, "ft_rank": row[3],
                    "rrf": 1.0 / (self.k + row[3]),
                }

        sorted_results = sorted(
            rrf_scores.items(), key=lambda x: x[1]["rrf"], reverse=True
        )[:top_k]

        return [
            FusedResult(
                id=doc_id, title=v["title"], content=v["content"],
                rrf_score=round(v["rrf"], 5),
                vector_rank=v["vec_rank"], fulltext_rank=v["ft_rank"],
            )
            for doc_id, v in sorted_results
        ]


fused = FusedRetriever(engine, api_key="your-api-key")
results = fused.fused_search("退款政策 2026", top_k=5)
for r in results:
    print(f"[RRF:{r.rrf_score:.4f} V:{r.vector_rank} F:{r.fulltext_rank}] {r.title}")

適用場景:關鍵詞+語義雙路召回、專業術語精確匹配需求。


模式5:端到端RAG應用構建

將檢索、重排、生成串聯為完整RAG管線。

from dataclasses import dataclass


@dataclass
class RAGConfig:
    retrieval_top_k: int = 10
    rerank_top_n: int = 3
    max_context_tokens: int = 3000
    llm_model: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"


class TiDBRAGPipeline:
    def __init__(self, engine, api_key: str, config: RAGConfig | None = None):
        self.config = config or RAGConfig()
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.retriever = HybridRetriever(
            engine, api_key, model=self.config.embedding_model
        )

    def _rerank(self, query: str, results: list[SearchResult]) -> list[SearchResult]:
        scored = []
        for r in results:
            overlap = sum(
                1 for w in query if w in r.content
            ) / max(len(query), 1)
            final_score = 0.7 * r.similarity + 0.3 * overlap
            scored.append((r, final_score))
        scored.sort(key=lambda x: x[1], reverse=True)
        return [r for r, _ in scored[: self.config.rerank_top_n]]

    def query(self, question: str, category: str | None = None) -> str:
        results = self.retriever.search(
            query=question, category=category,
            top_k=self.config.retrieval_top_k
        )
        reranked = self._rerank(question, results)
        context = "\n\n".join(
            f"【{r.title}】({r.category})\n{r.content}" for r in reranked
        )
        if len(context) > self.config.max_context_tokens * 4:
            context = context[: self.config.max_context_tokens * 4]

        prompt = (
            "基於以下檢索結果回答問題。如果上下文資訊不足,請明確說明。\n\n"
            f"上下文:\n{context}\n\n問題:{question}"
        )
        response = self.client.chat.completions.create(
            model=self.config.llm_model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=800,
        )
        return response.choices[0].message.content


pipeline = TiDBRAGPipeline(engine, api_key="your-api-key")
answer = pipeline.query("2026年Q2的退款政策是什麼?", category="policy")
print(answer)

適用場景:生產級RAG問答系統、企業知識庫智慧助手。


避坑指南:5個常見陷阱

陷阱1:向量列不建索引直接查

錯誤做法

SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;

正確做法

ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_vec USING HNSW (embedding)
WITH (ef_construction = 128, m = 16);

-- 查詢時指定ef_search
SET SESSION tidb_vector_ef_search = 64;
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;

陷阱2:嵌入維度與索引維度不匹配

錯誤做法

embedding = Column(VectorType(768))  # 表定義768維
# 但寫入時用了text-embedding-3-small的1536維向量

正確做法

EMBEDDING_DIM = 1536

class KnowledgeDoc(Base):
    embedding = Column(VectorType(EMBEDDING_DIM))

# 嵌入生成時顯式指定維度
response = client.embeddings.create(
    input=text, model="text-embedding-3-small", dimensions=EMBEDDING_DIM
)

陷阱3:大批量寫入不關閉索引

錯誤做法

for doc in large_doc_list:
    session.add(KnowledgeDoc(embedding=doc["emb"]))
session.commit()  # 每條寫入都觸發索引更新

正確做法

# 先關閉向量索引,批量寫入後再重建
with engine.connect() as conn:
    conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec INVISIBLE")
    conn.commit()

bulk_write(large_doc_list)

with engine.connect() as conn:
    conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec VISIBLE")
    conn.commit()

陷阱4:混合查詢把標量過濾放在向量排序之後

錯誤做法

SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 100;
-- 然後在應用層過濾category

正確做法

SELECT * FROM knowledge_docs
WHERE category = 'policy' AND created_at > '2026-01-01'
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 10;

陷阱5:RAG上下文不做截斷直接餵LLM

錯誤做法

context = "\n".join(r.content for r in all_results)
prompt = f"上下文:{context}\n問題:{question}"

正確做法

MAX_CTX_TOKENS = 3000

def truncate_context(results: list[SearchResult]) -> str:
    parts, total = [], 0
    for r in results:
        est_tokens = len(r.content) // 4
        if total + est_tokens > MAX_CTX_TOKENS:
            break
        parts.append(f"【{r.title}】\n{r.content}")
        total += est_tokens
    return "\n\n".join(parts)

報錯排查:10個常見錯誤

# 錯誤資訊 原因 解決方案
1 Vector dimension mismatch: expected 1536 got 768 嵌入維度與表定義不一致 統一嵌入模型維度或修改VectorType參數
2 HNSW index build OOM ef_construction過大或資料量超記憶體 降低ef_construction至64,分批構建索引
3 vec_cosine_distance function not found TiDB版本低於8.0或未啟用向量功能 升級TiDB至8.0+,確認向量外掛已載入
4 FULLTEXT index not found on column content 未建立全文索引 ALTER TABLE knowledge_docs ADD FULLTEXT INDEX ft_content(content)
5 Query timeout during vector search HNSW ef_search過大或資料量超預期 降低ef_search,添加標量過濾縮小範圍
6 Duplicate entry for key 'PRIMARY' 批量寫入時ID衝突 使用auto_increment或顯式指定ID範圍
7 Embedding API rate limit exceeded 嵌入API呼叫頻率超限 增加請求間隔,使用batch介面,實施退避重試
8 TiDB connection pool exhausted 併發查詢過多,連接未釋放 配置連接池大小,使用with語句確保連接歸還
9 Index invisible after bulk load 批量寫入後忘記恢復索引可見性 執行ALTER INDEX idx_vec VISIBLE並驗證
10 RRF fusion returns empty results 向量檢索和全文檢索均無匹配 放寬相似度閾值,增加fallback檢索策略

進階優化:4個關鍵技巧

1. HNSW參數自適應調優

class HNSWAutoTuner:
    def __init__(self, engine):
        self.engine = engine

    def recommend_params(self, row_count: int,
                         recall_target: float = 0.95) -> dict:
        if row_count < 100_000:
            return {"ef_construction": 64, "m": 8, "ef_search": 40}
        elif row_count < 1_000_000:
            return {"ef_construction": 128, "m": 16, "ef_search": 64}
        else:
            return {"ef_construction": 256, "m": 24, "ef_search": 128}

    def apply(self, table: str, params: dict):
        with self.engine.connect() as conn:
            conn.execute(
                f"ALTER TABLE {table} ADD VECTOR INDEX idx_vec "
                f"USING HNSW (embedding) WITH "
                f"(ef_construction={params['ef_construction']}, m={params['m']})"
            )
            conn.execute(
                f"SET SESSION tidb_vector_ef_search = {params['ef_search']}"
            )
            conn.commit()

2. 嵌入快取避免重複計算

import hashlib
import json


class EmbeddingCache:
    def __init__(self, engine, ttl_hours: int = 24):
        self.engine = engine
        self.ttl = ttl_hours
        self._local: dict[str, list[float]] = {}

    def _cache_key(self, text: str, model: str) -> str:
        raw = f"{model}:{text}"
        return hashlib.md5(raw.encode()).hexdigest()

    def get_or_compute(self, text: str, model: str,
                       compute_fn) -> list[float]:
        key = self._cache_key(text, model)
        if key in self._local:
            return self._local[key]
        emb = compute_fn(text, model)
        self._local[key] = emb
        return emb

3. 查詢路由:自動選擇檢索策略

class QueryRouter:
    def route(self, query: str) -> str:
        has_exact_terms = any(
            kw in query for kw in ["編號", "ID", "代碼", "版本號"]
        )
        is_recent = any(
            kw in query for kw in ["最新", "最近", "今天", "本週"]
        )
        if has_exact_terms and is_recent:
            return "fused"
        elif has_exact_terms:
            return "fulltext"
        elif is_recent:
            return "hybrid"
        else:
            return "vector"

4. 向量檢索效能監控

class VectorSearchMonitor:
    def __init__(self, engine):
        self.engine = engine

    def get_index_stats(self, table: str) -> dict:
        with self.engine.connect() as conn:
            count = conn.execute(
                f"SELECT COUNT(*) FROM {table}"
            ).scalar()
            null_embs = conn.execute(
                f"SELECT COUNT(*) FROM {table} WHERE embedding IS NULL"
            ).scalar()
        return {
            "total_rows": count,
            "null_embeddings": null_embs,
            "coverage": round((count - null_embs) / count, 4) if count else 0,
        }

對比分析:4種向量檢索方案全面對比

維度 TiDB Vector Pinecone Milvus pgvector
SQL+向量融合 原生支援 不支援 標量過濾 原生支援
事務ACID 完整ACID 有限 完整ACID
HTAP分析 行存+列存
水平擴展 自動擴縮 自動 手動分片 需Citus
索引類型 HNSW 自動 IVF/HNSW/DiskANN HNSW/IVFFlat
維度上限 16384 2048 32768 2000(HNSW)
全文檢索 原生FULLTEXT tsvector
部署方式 自託管/Serverless 僅雲 自託管/雲 自託管
運維複雜度 低(託管) 極低
適用場景 RAG+業務融合 純向量SaaS 大規模向量 PG生態

TiDB Vector的核心優勢:一個資料庫搞定SQL+向量+全文+事務,無需資料同步管道。


總結展望

TiDB向量搜尋正在成為2026年RAG生產部署的關鍵基礎設施:

  1. Serverless向量:TiDB Cloud Serverless按向量查詢量計費,零運維啟動RAG
  2. 多模態向量:圖像、音訊嵌入與文本嵌入統一存儲,跨模態檢索
  3. 串流嵌入更新:CDC驅動的自動嵌入重算,資料變更即時反映到向量索引
  4. 自適應索引:根據查詢模式自動調整HNSW參數,無需手動調優
  5. RAG評測標準化:TiDB內建檢索品質指標,端到端RAG評測開箱即用

選擇TiDB向量RAG的原則:如果你的RAG需要SQL過濾、事務保障或業務資料融合,TiDB是唯一的一站式選擇。純向量檢索場景可考慮Pinecone,超大規模向量庫可考慮Milvus。起步建議用TiDB Cloud Serverless免費額度驗證效果。


線上工具推薦

  • JSON格式化 — 格式化嵌入向量和檢索結果的JSON資料
  • 雜湊計算 — 計算文檔指紋和嵌入快取的MD5/SHA雜湊
  • Curl轉程式碼 — 將TiDB API和嵌入介面除錯curl轉為Python程式碼

本站提供瀏覽器本地工具,免註冊即可試用 →

#TiDB向量搜索#RAG生产部署#向量数据库#HTAP#语义检索#2026#数据库