TiDB向量搜索RAG实战:HTAP架构下语义检索的5个核心模式

数据库

向量数据库的痛点:为什么纯向量库撑不住生产RAG?

你的RAG系统上线了,用户搜"最近三个月的退款政策",向量检索返回了一堆语义相近但时间不对的文档。你想加个时间过滤,却发现向量数据库没有事务,关系查询和向量查询是两套系统,数据同步全靠手动脚本——纯向量数据库在生产RAG中暴露了四大硬伤

  1. 无事务保障:向量写入和业务数据更新不在同一个事务里,数据一致性靠运气
  2. 关系与向量割裂:标量过滤只能做元数据预筛,无法真正融合SQL查询和语义检索
  3. 数据同步复杂:业务库和向量库双写,CDC管道维护成本高,延迟大
  4. 成本翻倍:两套数据库意味着双倍存储、双倍运维、双倍故障点

TiDB向量搜索用HTAP架构一石三鸟:一个数据库同时承载事务、分析和向量检索,SQL与向量查询天然融合。


核心概念速查

概念 说明 典型实现
TiDB向量搜索 TiDB原生向量列类型与HNSW索引 TiDB 8.0+
HTAP 混合事务与分析处理,行存+列存 TiDB TiFlash
向量索引 高维向量的近似最近邻索引结构 HNSW、IVF
HNSW 基于分层可导航小世界图的ANN算法 ef_construction、max_level
混合查询 SQL标量过滤与向量相似度联合查询 WHERE + ORDER BY vec_cosine
语义检索 基于语义嵌入的相似度搜索 cosine、L2、inner product
嵌入模型 将文本映射为稠密向量的神经网络 text-embedding-3-small、bge-large
RAG 检索增强生成,LLM+外部知识检索 LangChain、LlamaIndex
全文检索 基于倒排索引的关键词搜索 MATCH AGAINST、FULLTEXT
标量过滤 在向量检索前/后应用条件过滤 category='tech'、date > '2026-01'

问题分析:TiDB向量RAG的5大挑战

# 挑战 具体表现 影响
1 向量与关系数据融合 向量列与业务列在不同表,JOIN查询性能差 混合查询延迟高,用户体验差
2 查询性能优化 HNSW参数调优复杂,大数据集召回率与延迟难平衡 检索慢或召回不足
3 嵌入模型选择 维度、语言、领域适配差异大 嵌入质量差导致检索精度低
4 数据更新与索引维护 增量写入触发索引重建,大批量更新阻塞在线查询 写入期间查询超时
5 成本控制 向量列占用存储大,HTAP集群资源消耗高 存储和计算成本超预算

这5个挑战环环相扣:模型选择决定向量维度和索引参数,索引参数影响查询性能,数据更新策略依赖索引机制,成本又受所有因素制约。生产级TiDB向量RAG必须系统性地解决这些问题。


分步实操:5个核心模式

模式1:TiDB向量表设计与索引创建

RAG的第一步——设计融合业务字段与向量列的表结构,创建HNSW索引。

from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Index
from sqlalchemy.orm import declarative_base, Session
from tidb_vector.sqlalchemy import VectorType
from datetime import datetime

Base = declarative_base()


class KnowledgeDoc(Base):
    __tablename__ = "knowledge_docs"

    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    category = Column(String(50), index=True)
    source = Column(String(100))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    embedding = Column(VectorType(1536))

    __table_args__ = (
        Index("idx_category_created", "category", "created_at"),
    )


TIDB_CONN = "mysql+pymysql://root@127.0.0.1:4000/rag_db"
engine = create_engine(TIDB_CONN)
Base.metadata.create_all(engine)


def create_vector_index():
    with engine.connect() as conn:
        conn.execute("""
            ALTER TABLE knowledge_docs
            ADD VECTOR INDEX idx_embedding_vec
            USING HNSW (embedding)
            WITH (ef_construction = 128, m = 16)
        """)
        conn.commit()


create_vector_index()
print("向量表和HNSW索引创建完成")

适用场景:RAG知识库表设计、多字段混合查询需求。


模式2:嵌入生成与批量写入

将文档内容通过嵌入模型转为向量,批量写入TiDB。

import numpy as np
from openai import OpenAI
from sqlalchemy.orm import Session


class EmbeddingWriter:
    def __init__(self, engine, api_key: str,
                 model: str = "text-embedding-3-small",
                 dim: int = 1536, batch_size: int = 100):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.dim = dim
        self.batch_size = batch_size

    def generate_embedding(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            input=text, model=self.model, dimensions=self.dim
        )
        return response.data[0].embedding

    def batch_embed(self, texts: list[str]) -> list[list[float]]:
        response = self.client.embeddings.create(
            input=texts, model=self.model, dimensions=self.dim
        )
        return [item.embedding for item in response.data]

    def write_documents(self, docs: list[dict]) -> int:
        written = 0
        for i in range(0, len(docs), self.batch_size):
            batch = docs[i : i + self.batch_size]
            texts = [d["content"] for d in batch]
            embeddings = self.batch_embed(texts)
            with Session(self.engine) as session:
                for doc, emb in zip(batch, embeddings):
                    record = KnowledgeDoc(
                        title=doc["title"],
                        content=doc["content"],
                        category=doc.get("category", "general"),
                        source=doc.get("source", ""),
                        embedding=emb,
                    )
                    session.add(record)
                session.commit()
            written += len(batch)
        return written


writer = EmbeddingWriter(engine, api_key="your-api-key")
docs = [
    {"title": "退款政策", "content": "2026年Q2退款政策:购买30天内可全额退款...", "category": "policy"},
    {"title": "API限流规则", "content": "免费版API限流100次/分钟,付费版1000次/分钟...", "category": "tech"},
]
count = writer.write_documents(docs)
print(f"写入{count}条文档")

适用场景:知识库批量导入、增量文档嵌入写入。


模式3:语义检索与标量过滤混合查询

TiDB的核心优势——在一条SQL中同时完成语义检索和业务条件过滤。

from dataclasses import dataclass


@dataclass
class SearchResult:
    id: int
    title: str
    content: str
    category: str
    similarity: float


class HybridRetriever:
    def __init__(self, engine, api_key: str,
                 model: str = "text-embedding-3-small"):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def search(self, query: str, category: str | None = None,
               date_after: str | None = None,
               top_k: int = 5) -> list[SearchResult]:
        query_emb = self.client.embeddings.create(
            input=query, model=self.model
        ).data[0].embedding

        emb_str = str(query_emb)
        sql = """
            SELECT id, title, content, category,
                   1 - vec_cosine_distance(embedding, %s) AS similarity
            FROM knowledge_docs
            WHERE 1=1
        """
        params: list = [emb_str]

        if category:
            sql += " AND category = %s"
            params.append(category)
        if date_after:
            sql += " AND created_at > %s"
            params.append(date_after)

        sql += " ORDER BY similarity DESC LIMIT %s"
        params.append(top_k)

        with self.engine.connect() as conn:
            rows = conn.execute(sql, params).fetchall()

        return [
            SearchResult(
                id=r[0], title=r[1], content=r[2],
                category=r[3], similarity=round(r[4], 4)
            )
            for r in rows
        ]


retriever = HybridRetriever(engine, api_key="your-api-key")
results = retriever.search(
    query="退款政策", category="policy", date_after="2026-01-01", top_k=3
)
for r in results:
    print(f"[{r.similarity:.3f}] {r.title} ({r.category})")

适用场景:带业务条件的语义搜索、多维度过滤检索。


模式4:全文检索与向量检索融合

关键词精确匹配+语义理解双路召回,用RRF算法融合排序。

from dataclasses import dataclass


@dataclass
class FusedResult:
    id: int
    title: str
    content: str
    rrf_score: float
    vector_rank: int
    fulltext_rank: int


class FusedRetriever:
    def __init__(self, engine, api_key: str, k: int = 60):
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.k = k

    def fused_search(self, query: str, top_k: int = 5) -> list[FusedResult]:
        query_emb = self.client.embeddings.create(
            input=query, model="text-embedding-3-small"
        ).data[0].embedding
        emb_str = str(query_emb)

        vec_sql = """
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY vec_cosine_distance(embedding, %s)) AS vec_rank
            FROM knowledge_docs
            ORDER BY vec_cosine_distance(embedding, %s)
            LIMIT 50
        """
        ft_sql = """
            SELECT id, title, content,
                   ROW_NUMBER() OVER (ORDER BY MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)) AS ft_rank
            FROM knowledge_docs
            WHERE MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)
            LIMIT 50
        """

        with self.engine.connect() as conn:
            vec_rows = conn.execute(vec_sql, [emb_str, emb_str]).fetchall()
            ft_rows = conn.execute(ft_sql, [query, query]).fetchall()

        rrf_scores: dict[int, dict] = {}
        for row in vec_rows:
            doc_id = row[0]
            rrf_scores[doc_id] = {
                "title": row[1], "content": row[2],
                "vec_rank": row[3], "ft_rank": 9999,
                "rrf": 1.0 / (self.k + row[3]),
            }
        for row in ft_rows:
            doc_id = row[0]
            if doc_id in rrf_scores:
                rrf_scores[doc_id]["ft_rank"] = row[3]
                rrf_scores[doc_id]["rrf"] += 1.0 / (self.k + row[3])
            else:
                rrf_scores[doc_id] = {
                    "title": row[1], "content": row[2],
                    "vec_rank": 9999, "ft_rank": row[3],
                    "rrf": 1.0 / (self.k + row[3]),
                }

        sorted_results = sorted(
            rrf_scores.items(), key=lambda x: x[1]["rrf"], reverse=True
        )[:top_k]

        return [
            FusedResult(
                id=doc_id, title=v["title"], content=v["content"],
                rrf_score=round(v["rrf"], 5),
                vector_rank=v["vec_rank"], fulltext_rank=v["ft_rank"],
            )
            for doc_id, v in sorted_results
        ]


fused = FusedRetriever(engine, api_key="your-api-key")
results = fused.fused_search("退款政策 2026", top_k=5)
for r in results:
    print(f"[RRF:{r.rrf_score:.4f} V:{r.vector_rank} F:{r.fulltext_rank}] {r.title}")

适用场景:关键词+语义双路召回、专业术语精确匹配需求。


模式5:端到端RAG应用构建

将检索、重排、生成串联为完整RAG管线。

from dataclasses import dataclass


@dataclass
class RAGConfig:
    retrieval_top_k: int = 10
    rerank_top_n: int = 3
    max_context_tokens: int = 3000
    llm_model: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"


class TiDBRAGPipeline:
    def __init__(self, engine, api_key: str, config: RAGConfig | None = None):
        self.config = config or RAGConfig()
        self.engine = engine
        self.client = OpenAI(api_key=api_key)
        self.retriever = HybridRetriever(
            engine, api_key, model=self.config.embedding_model
        )

    def _rerank(self, query: str, results: list[SearchResult]) -> list[SearchResult]:
        scored = []
        for r in results:
            overlap = sum(
                1 for w in query if w in r.content
            ) / max(len(query), 1)
            final_score = 0.7 * r.similarity + 0.3 * overlap
            scored.append((r, final_score))
        scored.sort(key=lambda x: x[1], reverse=True)
        return [r for r, _ in scored[: self.config.rerank_top_n]]

    def query(self, question: str, category: str | None = None) -> str:
        results = self.retriever.search(
            query=question, category=category,
            top_k=self.config.retrieval_top_k
        )
        reranked = self._rerank(question, results)
        context = "\n\n".join(
            f"【{r.title}】({r.category})\n{r.content}" for r in reranked
        )
        if len(context) > self.config.max_context_tokens * 4:
            context = context[: self.config.max_context_tokens * 4]

        prompt = (
            "基于以下检索结果回答问题。如果上下文信息不足,请明确说明。\n\n"
            f"上下文:\n{context}\n\n问题:{question}"
        )
        response = self.client.chat.completions.create(
            model=self.config.llm_model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=800,
        )
        return response.choices[0].message.content


pipeline = TiDBRAGPipeline(engine, api_key="your-api-key")
answer = pipeline.query("2026年Q2的退款政策是什么?", category="policy")
print(answer)

适用场景:生产级RAG问答系统、企业知识库智能助手。


避坑指南:5个常见陷阱

陷阱1:向量列不建索引直接查

错误做法

SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;

正确做法

ALTER TABLE knowledge_docs
ADD VECTOR INDEX idx_vec USING HNSW (embedding)
WITH (ef_construction = 128, m = 16);

-- 查询时指定ef_search
SET SESSION tidb_vector_ef_search = 64;
SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @query_emb)
LIMIT 10;

陷阱2:嵌入维度与索引维度不匹配

错误做法

embedding = Column(VectorType(768))  # 表定义768维
# 但写入时用了text-embedding-3-small的1536维向量

正确做法

EMBEDDING_DIM = 1536

class KnowledgeDoc(Base):
    embedding = Column(VectorType(EMBEDDING_DIM))

# 嵌入生成时显式指定维度
response = client.embeddings.create(
    input=text, model="text-embedding-3-small", dimensions=EMBEDDING_DIM
)

陷阱3:大批量写入不关闭索引

错误做法

for doc in large_doc_list:
    session.add(KnowledgeDoc(embedding=doc["emb"]))
session.commit()  # 每条写入都触发索引更新

正确做法

# 先关闭向量索引,批量写入后再重建
with engine.connect() as conn:
    conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec INVISIBLE")
    conn.commit()

bulk_write(large_doc_list)

with engine.connect() as conn:
    conn.execute("ALTER TABLE knowledge_docs ALTER INDEX idx_vec VISIBLE")
    conn.commit()

陷阱4:混合查询把标量过滤放在向量排序之后

错误做法

SELECT * FROM knowledge_docs
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 100;
-- 然后在应用层过滤category

正确做法

SELECT * FROM knowledge_docs
WHERE category = 'policy' AND created_at > '2026-01-01'
ORDER BY vec_cosine_distance(embedding, @emb)
LIMIT 10;

陷阱5:RAG上下文不做截断直接喂LLM

错误做法

context = "\n".join(r.content for r in all_results)
prompt = f"上下文:{context}\n问题:{question}"

正确做法

MAX_CTX_TOKENS = 3000

def truncate_context(results: list[SearchResult]) -> str:
    parts, total = [], 0
    for r in results:
        est_tokens = len(r.content) // 4
        if total + est_tokens > MAX_CTX_TOKENS:
            break
        parts.append(f"【{r.title}】\n{r.content}")
        total += est_tokens
    return "\n\n".join(parts)

报错排查:10个常见错误

# 错误信息 原因 解决方案
1 Vector dimension mismatch: expected 1536 got 768 嵌入维度与表定义不一致 统一嵌入模型维度或修改VectorType参数
2 HNSW index build OOM ef_construction过大或数据量超内存 降低ef_construction至64,分批构建索引
3 vec_cosine_distance function not found TiDB版本低于8.0或未启用向量功能 升级TiDB至8.0+,确认向量插件已加载
4 FULLTEXT index not found on column content 未创建全文索引 ALTER TABLE knowledge_docs ADD FULLTEXT INDEX ft_content(content)
5 Query timeout during vector search HNSW ef_search过大或数据量超预期 降低ef_search,添加标量过滤缩小范围
6 Duplicate entry for key 'PRIMARY' 批量写入时ID冲突 使用auto_increment或显式指定ID范围
7 Embedding API rate limit exceeded 嵌入API调用频率超限 增加请求间隔,使用batch接口,实施退避重试
8 TiDB connection pool exhausted 并发查询过多,连接未释放 配置连接池大小,使用with语句确保连接归还
9 Index invisible after bulk load 批量写入后忘记恢复索引可见性 执行ALTER INDEX idx_vec VISIBLE并验证
10 RRF fusion returns empty results 向量检索和全文检索均无匹配 放宽相似度阈值,增加fallback检索策略

进阶优化:4个关键技巧

1. HNSW参数自适应调优

class HNSWAutoTuner:
    def __init__(self, engine):
        self.engine = engine

    def recommend_params(self, row_count: int,
                         recall_target: float = 0.95) -> dict:
        if row_count < 100_000:
            return {"ef_construction": 64, "m": 8, "ef_search": 40}
        elif row_count < 1_000_000:
            return {"ef_construction": 128, "m": 16, "ef_search": 64}
        else:
            return {"ef_construction": 256, "m": 24, "ef_search": 128}

    def apply(self, table: str, params: dict):
        with self.engine.connect() as conn:
            conn.execute(
                f"ALTER TABLE {table} ADD VECTOR INDEX idx_vec "
                f"USING HNSW (embedding) WITH "
                f"(ef_construction={params['ef_construction']}, m={params['m']})"
            )
            conn.execute(
                f"SET SESSION tidb_vector_ef_search = {params['ef_search']}"
            )
            conn.commit()

2. 嵌入缓存避免重复计算

import hashlib
import json


class EmbeddingCache:
    def __init__(self, engine, ttl_hours: int = 24):
        self.engine = engine
        self.ttl = ttl_hours
        self._local: dict[str, list[float]] = {}

    def _cache_key(self, text: str, model: str) -> str:
        raw = f"{model}:{text}"
        return hashlib.md5(raw.encode()).hexdigest()

    def get_or_compute(self, text: str, model: str,
                       compute_fn) -> list[float]:
        key = self._cache_key(text, model)
        if key in self._local:
            return self._local[key]
        emb = compute_fn(text, model)
        self._local[key] = emb
        return emb

3. 查询路由:自动选择检索策略

class QueryRouter:
    def route(self, query: str) -> str:
        has_exact_terms = any(
            kw in query for kw in ["编号", "ID", "代码", "版本号"]
        )
        is_recent = any(
            kw in query for kw in ["最新", "最近", "今天", "本周"]
        )
        if has_exact_terms and is_recent:
            return "fused"
        elif has_exact_terms:
            return "fulltext"
        elif is_recent:
            return "hybrid"
        else:
            return "vector"

4. 向量检索性能监控

class VectorSearchMonitor:
    def __init__(self, engine):
        self.engine = engine

    def get_index_stats(self, table: str) -> dict:
        with self.engine.connect() as conn:
            count = conn.execute(
                f"SELECT COUNT(*) FROM {table}"
            ).scalar()
            null_embs = conn.execute(
                f"SELECT COUNT(*) FROM {table} WHERE embedding IS NULL"
            ).scalar()
        return {
            "total_rows": count,
            "null_embeddings": null_embs,
            "coverage": round((count - null_embs) / count, 4) if count else 0,
        }

对比分析:4种向量检索方案全面对比

维度 TiDB Vector Pinecone Milvus pgvector
SQL+向量融合 原生支持 不支持 标量过滤 原生支持
事务ACID 完整ACID 有限 完整ACID
HTAP分析 行存+列存
水平扩展 自动扩缩 自动 手动分片 需Citus
索引类型 HNSW 自动 IVF/HNSW/DiskANN HNSW/IVFFlat
维度上限 16384 2048 32768 2000(HNSW)
全文检索 原生FULLTEXT tsvector
部署方式 自托管/Serverless 仅云 自托管/云 自托管
运维复杂度 低(托管) 极低
适用场景 RAG+业务融合 纯向量SaaS 大规模向量 PG生态

TiDB Vector的核心优势:一个数据库搞定SQL+向量+全文+事务,无需数据同步管道。


总结展望

TiDB向量搜索正在成为2026年RAG生产部署的关键基础设施:

  1. Serverless向量:TiDB Cloud Serverless按向量查询量计费,零运维启动RAG
  2. 多模态向量:图像、音频嵌入与文本嵌入统一存储,跨模态检索
  3. 流式嵌入更新:CDC驱动的自动嵌入重算,数据变更实时反映到向量索引
  4. 自适应索引:根据查询模式自动调整HNSW参数,无需手动调优
  5. RAG评测标准化:TiDB内置检索质量指标,端到端RAG评测开箱即用

选择TiDB向量RAG的原则:如果你的RAG需要SQL过滤、事务保障或业务数据融合,TiDB是唯一的一站式选择。纯向量检索场景可考虑Pinecone,超大规模向量库可考虑Milvus。起步建议用TiDB Cloud Serverless免费额度验证效果。


在线工具推荐

  • JSON格式化 — 格式化嵌入向量和检索结果的JSON数据
  • 哈希计算 — 计算文档指纹和嵌入缓存的MD5/SHA哈希
  • Curl转代码 — 将TiDB API和嵌入接口调试curl转为Python代码

本站提供浏览器本地工具,免注册即可试用 →

#TiDB向量搜索#RAG生产部署#向量数据库#HTAP#语义检索#2026#数据库