Python AI Agent记忆系统:7种生产级长期记忆方案

AI与大数据

你的AI Agent为什么总是"失忆"?

你花了3天调通的Agent,上线第一天用户就投诉:"我昨天告诉你的偏好呢?怎么又忘了?"这不是个例——AI Agent记忆系统是2026年生产落地的头号难题。大模型本身没有持久记忆,每次对话都是一张白纸。短期记忆撑不过一个会话,长期记忆又面临检索慢、存储贵、一致性差三座大山。

更扎心的是:很多团队用LangGraph搭了个Agent,把对话历史往列表里一塞就以为搞定了记忆。结果Token爆炸、成本飙升、关键信息被截断,Agent越聊越"傻"。Python Agent长期记忆不是加个数据库就完事的,它需要一套完整的架构设计。


核心概念速查

概念 说明 典型实现
短期记忆 当前会话内的上下文信息 对话历史列表、滑动窗口
长期记忆 跨会话持久化的知识和偏好 向量数据库、关系数据库
情景记忆 对特定事件和经历的回忆 时间索引+向量检索
语义记忆 对概念和知识的结构化理解 知识图谱、本体库
工作记忆 当前推理步骤中的临时信息 Scratchpad、ReAct观察
向量检索 基于语义相似度的记忆召回 Embedding+FAISS/Chroma
记忆压缩 将冗长历史压缩为摘要 LLM摘要、关键信息提取

问题分析:AI Agent记忆系统的5大挑战

# 挑战 具体表现 影响
1 Token窗口溢出 对话历史超过模型上下文长度 关键信息被截断,Agent"失忆"
2 检索精度不足 向量检索返回无关记忆 Agent基于错误信息做决策
3 记忆一致性冲突 新旧记忆矛盾,无法判断谁对 输出自相矛盾,用户信任崩塌
4 冷启动问题 新用户无历史记忆可用 个性化体验差,留存率低
5 成本与延迟权衡 全量记忆检索慢且贵 响应超时或API账单爆炸

这5个问题环环相扣:为了解决Token溢出你压缩记忆,压缩导致信息丢失,丢失又加剧检索精度问题。生产级Agent记忆架构必须系统性地解决这些问题,而不是头痛医头。


分步实操:7种记忆实现模式

模式1:对话缓冲记忆(ConversationBufferMemory)

最简单的记忆模式——把所有对话历史原样存储。适合短对话场景。

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class Message:
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)


class ConversationBufferMemory:
    def __init__(self, max_tokens: int = 4000):
        self.messages: list[Message] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str) -> None:
        self.messages.append(Message(role=role, content=content))

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.messages]

    def estimate_tokens(self) -> int:
        return sum(len(m.content) // 4 for m in self.messages)

    def is_overflow(self) -> bool:
        return self.estimate_tokens() > self.max_tokens

    def clear(self) -> None:
        self.messages.clear()


memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "我喜欢Python,请用Python回答")
memory.add("assistant", "好的,我会用Python来回答你的问题")
memory.add("user", "帮我写一个快排算法")

print(memory.get_context())
print(f"Token估算: {memory.estimate_tokens()}, 溢出: {memory.is_overflow()}")

适用场景:客服机器人、简单问答,对话轮次 < 20。


模式2:滑动窗口记忆(SlidingWindowMemory)

只保留最近K轮对话,自动丢弃更早的历史。Token可控,但会丢失早期信息。

from collections import deque


class SlidingWindowMemory:
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.buffer: deque[Message] = deque(maxlen=window_size * 2)

    def add(self, role: str, content: str) -> None:
        self.buffer.append(Message(role=role, content=content))
        while len(self.buffer) > self.window_size * 2:
            self.buffer.popleft()

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.buffer]

    def get_recent(self, k: int = 1) -> list[dict]:
        recent = list(self.buffer)[-k * 2:]
        return [{"role": m.role, "content": m.content} for m in recent]

    def size(self) -> int:
        return len(self.buffer)


window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
    window_memory.add("user", f"第{i+1}个问题")
    window_memory.add("assistant", f"第{i+1}个回答")

print(f"窗口大小: {window_memory.size()}")
print(f"最近2轮: {window_memory.get_recent(k=2)}")

适用场景:长对话场景,只关心近期上下文,如代码调试助手。


模式3:摘要压缩记忆(SummaryCompressedMemory)

用LLM将历史对话压缩为摘要,保留关键信息的同时大幅减少Token占用。这是LangGraph记忆管理的核心思路之一。

from openai import OpenAI


class SummaryCompressedMemory:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini",
                 max_raw_messages: int = 10):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.max_raw_messages = max_raw_messages
        self.summary: str = ""
        self.recent_messages: list[Message] = []

    def add(self, role: str, content: str) -> None:
        self.recent_messages.append(Message(role=role, content=content))
        if len(self.recent_messages) > self.max_raw_messages:
            self._compress()

    def _compress(self) -> None:
        conversation_text = "\n".join(
            f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
        )
        prompt = (
            f"请将以下对话历史压缩为一段简洁的摘要,保留所有关键信息、"
            f"用户偏好和重要决策:\n\n{conversation_text}\n\n"
            f"当前已有摘要:{self.summary}\n\n请输出合并后的新摘要:"
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        self.summary = response.choices[0].message.content
        self.recent_messages = self.recent_messages[-2:]

    def get_context(self) -> list[dict]:
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"对话历史摘要:{self.summary}",
            })
        context.extend(
            {"role": m.role, "content": m.content}
            for m in self.recent_messages
        )
        return context


summary_memory = SummaryCompressedMemory(
    api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
    summary_memory.add("user", f"我想了解Python的{i+1}号特性")
    summary_memory.add("assistant", f"Python的{i+1}号特性是...")

print(f"上下文条目数: {len(summary_memory.get_context())}")

适用场景:多轮深度对话、咨询类Agent,需要保留长期语义。


模式4:向量语义记忆(VectorSemanticMemory)

将记忆向量化存储,通过语义相似度检索相关记忆。这是向量数据库记忆的核心实现。

import numpy as np
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class MemoryItem:
    content: str
    embedding: np.ndarray
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)


class VectorSemanticMemory:
    def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
        self.embedding_dim = embedding_dim
        self.top_k = top_k
        self.memories: list[MemoryItem] = []

    def add(self, content: str, embedding: np.ndarray,
            metadata: dict | None = None) -> None:
        self.memories.append(MemoryItem(
            content=content,
            embedding=embedding,
            metadata=metadata or {},
        ))

    def search(self, query_embedding: np.ndarray,
               top_k: int | None = None) -> list[dict]:
        k = top_k or self.top_k
        if not self.memories:
            return []
        scores = []
        for mem in self.memories:
            sim = float(np.dot(query_embedding, mem.embedding) /
                        (np.linalg.norm(query_embedding) *
                         np.linalg.norm(mem.embedding) + 1e-8))
            scores.append((sim, mem))
        scores.sort(key=lambda x: x[0], reverse=True)
        return [
            {
                "content": mem.content,
                "score": score,
                "timestamp": mem.timestamp.isoformat(),
                "metadata": mem.metadata,
            }
            for score, mem in scores[:k]
        ]

    def delete_old(self, before: datetime) -> int:
        original_len = len(self.memories)
        self.memories = [m for m in self.memories if m.timestamp >= before]
        return original_len - len(self.memories)


vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
    fake_embedding = np.random.randn(128)
    fake_embedding /= np.linalg.norm(fake_embedding)
    vector_memory.add(
        content=f"用户偏好记录{i+1}:喜欢Python和Rust",
        embedding=fake_embedding,
        metadata={"source": "chat", "turn": i},
    )

query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
    print(f"[{r['score']:.4f}] {r['content']}")

适用场景:RAG增强的Agent、个性化推荐、跨会话知识检索。生产环境建议用Chroma或Milvus替换内存存储。


模式5:情景记忆(EpisodicMemory)

记录Agent经历的特定事件,支持按时间和语义双重检索。

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class EmotionTag(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


@dataclass
class Episode:
    event: str
    context: str
    timestamp: datetime = field(default_factory=datetime.now)
    emotion: EmotionTag = EmotionTag.NEUTRAL
    importance: float = 0.5
    embedding: np.ndarray | None = None


class EpisodicMemory:
    def __init__(self, max_episodes: int = 1000):
        self.episodes: list[Episode] = []
        self.max_episodes = max_episodes

    def record(self, event: str, context: str,
               emotion: EmotionTag = EmotionTag.NEUTRAL,
               importance: float = 0.5,
               embedding: np.ndarray | None = None) -> None:
        self.episodes.append(Episode(
            event=event, context=context, emotion=emotion,
            importance=importance, embedding=embedding,
        ))
        if len(self.episodes) > self.max_episodes:
            self._evict()

    def _evict(self) -> None:
        self.episodes.sort(key=lambda e: e.importance, reverse=True)
        self.episodes = self.episodes[:self.max_episodes]

    def recall_by_time(self, start: datetime,
                       end: datetime) -> list[Episode]:
        return [
            ep for ep in self.episodes
            if start <= ep.timestamp <= end
        ]

    def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
        return [ep for ep in self.episodes if ep.importance >= threshold]

    def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
        return [ep for ep in self.episodes if ep.emotion == emotion]

    def get_recent(self, k: int = 5) -> list[Episode]:
        return self.episodes[-k:]


episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
    event="用户反馈API响应慢",
    context="用户在高峰期调用/v2/predict接口",
    emotion=EmotionTag.NEGATIVE,
    importance=0.9,
)
episodic_mem.record(
    event="用户完成首次部署",
    context="使用Docker Compose部署成功",
    emotion=EmotionTag.POSITIVE,
    importance=0.7,
)

important = episodic_mem.recall_by_importance(0.8)
print(f"重要事件数: {len(important)}")
for ep in important:
    print(f"  [{ep.emotion.value}] {ep.event}")

适用场景:客服Agent记录用户投诉、运维Agent记录故障事件。


模式6:知识图谱记忆(KnowledgeGraphMemory)

用图谱结构存储实体和关系,支持多跳推理。这是Agent记忆架构中最强大的模式。

from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class Entity:
    name: str
    entity_type: str
    properties: dict = field(default_factory=dict)


@dataclass
class Relation:
    source: str
    target: str
    relation_type: str
    properties: dict = field(default_factory=dict)


class KnowledgeGraphMemory:
    def __init__(self):
        self.entities: dict[str, Entity] = {}
        self.relations: list[Relation] = []
        self._adjacency: dict[str, list[Relation]] = defaultdict(list)

    def add_entity(self, name: str, entity_type: str,
                   properties: dict | None = None) -> Entity:
        entity = Entity(name=name, entity_type=entity_type,
                        properties=properties or {})
        self.entities[name] = entity
        return entity

    def add_relation(self, source: str, target: str,
                     relation_type: str,
                     properties: dict | None = None) -> Relation:
        relation = Relation(source=source, target=target,
                            relation_type=relation_type,
                            properties=properties or {})
        self.relations.append(relation)
        self._adjacency[source].append(relation)
        self._adjacency[target].append(relation)
        return relation

    def get_entity(self, name: str) -> Entity | None:
        return self.entities.get(name)

    def get_relations_of(self, name: str) -> list[Relation]:
        return self._adjacency.get(name, [])

    def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
        visited = {start}
        current_level = {start}
        for _ in range(hops):
            next_level = set()
            for node in current_level:
                for rel in self._adjacency.get(node, []):
                    neighbor = rel.target if rel.source == node else rel.source
                    if neighbor not in visited:
                        next_level.add(neighbor)
                        visited.add(neighbor)
            current_level = next_level
        return visited

    def to_context_string(self, entity_name: str) -> str:
        entity = self.get_entity(entity_name)
        if not entity:
            return ""
        lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
        for rel in self.get_relations_of(entity_name):
            other = rel.target if rel.source == entity_name else rel.source
            lines.append(
                f"  - {rel.relation_type} -> {other} {rel.properties}"
            )
        return "\n".join(lines)


kg = KnowledgeGraphMemory()
kg.add_entity("张三", "用户", {"偏好语言": "Python", "级别": "高级"})
kg.add_entity("FastAPI", "框架", {"类型": "Web", "版本": "0.115"})
kg.add_entity("Docker", "工具", {"类型": "容器化"})
kg.add_relation("张三", "FastAPI", "使用", {"频率": "每天"})
kg.add_relation("张三", "Docker", "使用", {"频率": "每周"})
kg.add_relation("FastAPI", "Docker", "部署方式")

print(kg.to_context_string("张三"))
print(f"2跳关联: {kg.multi_hop_query('张三', hops=2)}")

适用场景:需要多跳推理的复杂Agent,如企业知识助手、医疗诊断Agent。


模式7:混合层级记忆(HybridHierarchicalMemory)

融合上述所有模式,按层级组织记忆。这是生产级Agent记忆的终极方案。

from dataclasses import dataclass, field
from datetime import datetime, timedelta


@dataclass
class MemoryConfig:
    short_term_window: int = 10
    summary_threshold: int = 8
    vector_top_k: int = 5
    kg_max_hops: int = 2
    importance_threshold: float = 0.7


class HybridHierarchicalMemory:
    def __init__(self, config: MemoryConfig | None = None):
        self.config = config or MemoryConfig()
        self.short_term: SlidingWindowMemory = SlidingWindowMemory(
            window_size=self.config.short_term_window
        )
        self.summary_cache: str = ""
        self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
            top_k=self.config.vector_top_k
        )
        self.episodic_mem: EpisodicMemory = EpisodicMemory()
        self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()

    def add_message(self, role: str, content: str,
                    importance: float = 0.5) -> None:
        self.short_term.add(role, content)
        if importance >= self.config.importance_threshold:
            self.episodic_mem.record(
                event=content, context=f"role={role}",
                importance=importance,
            )

    def add_semantic_memory(self, content: str,
                            embedding: np.ndarray,
                            metadata: dict | None = None) -> None:
        self.semantic_memories.add(content, embedding, metadata)

    def add_knowledge(self, entity_name: str, entity_type: str,
                      properties: dict | None = None) -> None:
        self.knowledge_graph.add_entity(
            entity_name, entity_type, properties
        )

    def add_knowledge_relation(self, source: str, target: str,
                               relation_type: str) -> None:
        self.knowledge_graph.add_relation(source, target, relation_type)

    def retrieve(self, query: str | None = None,
                 query_embedding: np.ndarray | None = None) -> list[dict]:
        context_parts = []
        short_term_ctx = self.short_term.get_context()
        if short_term_ctx:
            context_parts.append({
                "layer": "short_term",
                "content": short_term_ctx,
            })
        if self.summary_cache:
            context_parts.append({
                "layer": "summary",
                "content": self.summary_cache,
            })
        if query_embedding is not None:
            semantic_results = self.semantic_memories.search(query_embedding)
            if semantic_results:
                context_parts.append({
                    "layer": "semantic",
                    "content": semantic_results,
                })
        important_episodes = self.episodic_mem.recall_by_importance(
            self.config.importance_threshold
        )
        if important_episodes:
            context_parts.append({
                "layer": "episodic",
                "content": [
                    {"event": ep.event, "importance": ep.importance}
                    for ep in important_episodes[-5:]
                ],
            })
        return context_parts

    def get_full_context(self, query_embedding: np.ndarray | None = None,
                         focus_entity: str | None = None) -> list[dict]:
        context = self.retrieve(query_embedding=query_embedding)
        if focus_entity:
            kg_context = self.knowledge_graph.to_context_string(focus_entity)
            if kg_context:
                context.append({
                    "layer": "knowledge_graph",
                    "content": kg_context,
                })
        return context


hybrid = HybridHierarchicalMemory(MemoryConfig(
    short_term_window=5, summary_threshold=6,
    vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "帮我用FastAPI搭一个RAG服务", importance=0.8)
hybrid.add_message("assistant", "好的,我来帮你设计架构", importance=0.3)
hybrid.add_knowledge("FastAPI", "框架", {"异步": True})
hybrid.add_knowledge("RAG", "架构", {"类型": "检索增强生成"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "实现框架")

ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
    print(f"[{part['layer']}] {str(part['content'])[:100]}")

适用场景:企业级AI助手、需要全链路记忆的复杂Agent系统。


避坑指南:5个常见陷阱

陷阱1:无限制地存储对话历史

错误做法

class BadMemory:
    def __init__(self):
        self.history = []

    def add(self, msg: str):
        self.history.append(msg)

正确做法

class GoodMemory:
    def __init__(self, max_messages: int = 50):
        self.history = []
        self.max_messages = max_messages

    def add(self, msg: str):
        self.history.append(msg)
        if len(self.history) > self.max_messages:
            self._compress_old()

    def _compress_old(self):
        old = self.history[:len(self.history) // 2]
        self.history = self.history[len(self.history) // 2:]

陷阱2:向量检索不做相似度阈值过滤

错误做法

results = vector_store.similarity_search(query, k=5)
for r in results:
    context += r.page_content

正确做法

SIMILARITY_THRESHOLD = 0.75

results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
    context += doc.page_content

陷阱3:记忆写入不做去重

错误做法

def save_memory(content: str):
    db.insert({"content": content})

正确做法

import hashlib


def save_memory(content: str, metadata: dict | None = None):
    content_hash = hashlib.md5(content.encode()).hexdigest()
    existing = db.find_one({"content_hash": content_hash})
    if existing:
        db.update({"content_hash": content_hash},
                  {"$set": {"updated_at": datetime.now()}})
    else:
        db.insert({
            "content": content,
            "content_hash": content_hash,
            "metadata": metadata or {},
        })

陷阱4:忽略记忆的时间衰减

错误做法

all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)

正确做法

from datetime import datetime, timedelta


def get_memories_with_decay(half_life_days: float = 30.0):
    now = datetime.now()
    memories = db.get_all()
    scored = []
    for m in memories:
        age_days = (now - m["created_at"]).days
        decay = 0.5 ** (age_days / half_life_days)
        scored.append((m, decay * m.get("importance", 0.5)))
    scored.sort(key=lambda x: x[1], reverse=True)
    return [m for m, s in scored[:10]]

陷阱5:摘要压缩丢失关键细节

错误做法

summary_prompt = "总结以下对话:" + conversation_text

正确做法

summary_prompt = (
    "请将以下对话历史压缩为摘要,必须保留:\n"
    "1. 用户明确表达的偏好和需求\n"
    "2. 已做出的重要决策和结论\n"
    "3. 涉及的具体数值、名称、日期\n"
    "4. 未解决或待跟进的问题\n\n"
    f"对话内容:\n{conversation_text}"
)

报错排查:10个常见错误

# 错误信息 原因 解决方案
1 Token limit exceeded 对话历史+系统提示超出模型上下文窗口 使用滑动窗口或摘要压缩记忆
2 Embedding dimension mismatch 查询向量与存储向量维度不一致 统一使用同一Embedding模型
3 Rate limit hit on vector DB 高频检索触发向量数据库限流 批量查询+本地缓存
4 Memory retrieval returns empty 向量索引未构建或数据未写入 检查写入是否commit,索引是否refresh
5 Context window too short for summary 摘要本身过长挤占了对话空间 限制摘要长度,分层压缩
6 Knowledge graph cycle detected 实体关系形成环路 添加关系时检测环路并拒绝
7 Stale memory causing wrong answers 记忆未更新,使用了过期信息 实现TTL机制和版本号管理
8 Concurrent write conflict 多Agent实例同时写入记忆 使用乐观锁或分布式锁
9 Embedding model timeout 大批量文本Embedding超时 分批处理,每批不超过100条
10 Memory leak in long-running agent 长时间运行的Agent内存持续增长 定期清理低重要性记忆,设置上限

进阶优化:3个关键技巧

1. 记忆分级存储策略

from enum import Enum
from datetime import datetime, timedelta


class MemoryTier(Enum):
    HOT = "hot"
    WARM = "warm"
    COLD = "cold"


class TieredMemoryStorage:
    def __init__(self):
        self.hot: list[dict] = []
        self.warm: list[dict] = []
        self.cold: list[dict] = []

    def add(self, memory: dict) -> None:
        memory["tier"] = MemoryTier.HOT.value
        memory["access_count"] = 0
        memory["created_at"] = datetime.now()
        self.hot.append(memory)

    def access(self, index: int) -> dict | None:
        for tier in [self.hot, self.warm, self.cold]:
            for mem in tier:
                if mem.get("index") == index:
                    mem["access_count"] += 1
                    mem["last_accessed"] = datetime.now()
                    return mem
        return None

    def rebalance(self) -> None:
        now = datetime.now()
        for mem in self.hot[:]:
            if (now - mem["last_accessed"]) > timedelta(hours=1):
                self.hot.remove(mem)
                mem["tier"] = MemoryTier.WARM.value
                self.warm.append(mem)
        for mem in self.warm[:]:
            if (now - mem["last_accessed"]) > timedelta(days=7):
                self.warm.remove(mem)
                mem["tier"] = MemoryTier.COLD.value
                self.cold.append(mem)

2. 异步记忆写入

import asyncio
from concurrent.futures import ThreadPoolExecutor


class AsyncMemoryWriter:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.pending: asyncio.Queue = asyncio.Queue()

    async def write(self, memory: dict) -> None:
        await self.pending.put(memory)

    async def flush(self) -> int:
        count = 0
        batch = []
        while not self.pending.empty():
            mem = await self.pending.get()
            batch.append(mem)
            count += 1
        if batch:
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                self.executor, self._batch_write, batch
            )
        return count

    @staticmethod
    def _batch_write(batch: list[dict]) -> None:
        for mem in batch:
            db.insert(mem)

    async def start_periodic_flush(self, interval: float = 5.0):
        while True:
            await asyncio.sleep(interval)
            await self.flush()

3. 记忆质量评分与自动淘汰

from datetime import datetime, timedelta


class MemoryQualityScorer:
    def __init__(self, decay_rate: float = 0.95,
                 min_score: float = 0.1):
        self.decay_rate = decay_rate
        self.min_score = min_score

    def score(self, memory: dict) -> float:
        recency = self._recency_score(memory)
        frequency = self._frequency_score(memory)
        importance = memory.get("importance", 0.5)
        relevance = memory.get("relevance", 0.5)
        return (0.3 * recency + 0.2 * frequency +
                0.3 * importance + 0.2 * relevance)

    def _recency_score(self, memory: dict) -> float:
        age_hours = (datetime.now() - memory.get(
            "created_at", datetime.now()
        )).total_seconds() / 3600
        return self.decay_rate ** age_hours

    def _frequency_score(self, memory: dict) -> float:
        count = memory.get("access_count", 0)
        return min(1.0, count / 10.0)

    def should_evict(self, memory: dict) -> bool:
        return self.score(memory) < self.min_score

对比分析:7种记忆模式全面对比

维度 对话缓冲 滑动窗口 摘要压缩 向量语义 情景记忆 知识图谱 混合层级
实现复杂度 ★☆☆ ★☆☆ ★★☆ ★★★ ★★☆ ★★★ ★★★★
Token效率 ★☆☆ ★★☆ ★★★ ★★★ ★★☆ ★★★ ★★★★
检索精度 ★★☆ ★★☆ ★★☆ ★★★★ ★★★ ★★★★ ★★★★★
多跳推理
跨会话
写入延迟 ~1ms ~1ms ~500ms ~50ms ~10ms ~10ms ~100ms
存储成本
适用规模 <20轮 <50轮 <200轮 10K+条 1K+条 10K+实体 无限制
典型场景 简单问答 代码调试 咨询对话 RAG增强 事件追踪 知识推理 企业助手

★越多表示该维度表现越好;✓支持 △部分支持 ✗不支持


总结展望

AI Agent记忆系统正在从"锦上添花"变为"不可或缺"。2026年的趋势:

  1. 原生记忆支持:LangGraph Memory、MemGPT等框架将记忆作为一等公民
  2. 多模态记忆:不仅记住文本,还记住图像、音频、视频上下文
  3. 联邦记忆:多Agent共享记忆池,同时保护隐私边界
  4. 自适应压缩:根据查询意图动态决定记忆压缩粒度
  5. 记忆审计:可追溯的记忆写入和召回日志,满足合规要求

选择记忆方案的原则:从简单开始,按需升级。先用滑动窗口跑通流程,遇到Token瓶颈加摘要压缩,需要跨会话就上向量数据库,多跳推理再引入知识图谱。别一上来就搞混合层级——那是最强大的方案,也是最复杂的。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#AI Agent#记忆系统#LangGraph#向量数据库#生产部署#Python#2026#AI与大数据