Python AI Agent记忆系统:7种生产级长期记忆方案
你的AI Agent为什么总是"失忆"?
你花了3天调通的Agent,上线第一天用户就投诉:"我昨天告诉你的偏好呢?怎么又忘了?"这不是个例——AI Agent记忆系统是2026年生产落地的头号难题。大模型本身没有持久记忆,每次对话都是一张白纸。短期记忆撑不过一个会话,长期记忆又面临检索慢、存储贵、一致性差三座大山。
更扎心的是:很多团队用LangGraph搭了个Agent,把对话历史往列表里一塞就以为搞定了记忆。结果Token爆炸、成本飙升、关键信息被截断,Agent越聊越"傻"。Python Agent长期记忆不是加个数据库就完事的,它需要一套完整的架构设计。
核心概念速查
| 概念 | 说明 | 典型实现 |
|---|---|---|
| 短期记忆 | 当前会话内的上下文信息 | 对话历史列表、滑动窗口 |
| 长期记忆 | 跨会话持久化的知识和偏好 | 向量数据库、关系数据库 |
| 情景记忆 | 对特定事件和经历的回忆 | 时间索引+向量检索 |
| 语义记忆 | 对概念和知识的结构化理解 | 知识图谱、本体库 |
| 工作记忆 | 当前推理步骤中的临时信息 | Scratchpad、ReAct观察 |
| 向量检索 | 基于语义相似度的记忆召回 | Embedding+FAISS/Chroma |
| 记忆压缩 | 将冗长历史压缩为摘要 | LLM摘要、关键信息提取 |
问题分析:AI Agent记忆系统的5大挑战
| # | 挑战 | 具体表现 | 影响 |
|---|---|---|---|
| 1 | Token窗口溢出 | 对话历史超过模型上下文长度 | 关键信息被截断,Agent"失忆" |
| 2 | 检索精度不足 | 向量检索返回无关记忆 | Agent基于错误信息做决策 |
| 3 | 记忆一致性冲突 | 新旧记忆矛盾,无法判断谁对 | 输出自相矛盾,用户信任崩塌 |
| 4 | 冷启动问题 | 新用户无历史记忆可用 | 个性化体验差,留存率低 |
| 5 | 成本与延迟权衡 | 全量记忆检索慢且贵 | 响应超时或API账单爆炸 |
这5个问题环环相扣:为了解决Token溢出你压缩记忆,压缩导致信息丢失,丢失又加剧检索精度问题。生产级Agent记忆架构必须系统性地解决这些问题,而不是头痛医头。
分步实操:7种记忆实现模式
模式1:对话缓冲记忆(ConversationBufferMemory)
最简单的记忆模式——把所有对话历史原样存储。适合短对话场景。
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Message:
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
class ConversationBufferMemory:
def __init__(self, max_tokens: int = 4000):
self.messages: list[Message] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str) -> None:
self.messages.append(Message(role=role, content=content))
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.messages]
def estimate_tokens(self) -> int:
return sum(len(m.content) // 4 for m in self.messages)
def is_overflow(self) -> bool:
return self.estimate_tokens() > self.max_tokens
def clear(self) -> None:
self.messages.clear()
memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "我喜欢Python,请用Python回答")
memory.add("assistant", "好的,我会用Python来回答你的问题")
memory.add("user", "帮我写一个快排算法")
print(memory.get_context())
print(f"Token估算: {memory.estimate_tokens()}, 溢出: {memory.is_overflow()}")
适用场景:客服机器人、简单问答,对话轮次 < 20。
模式2:滑动窗口记忆(SlidingWindowMemory)
只保留最近K轮对话,自动丢弃更早的历史。Token可控,但会丢失早期信息。
from collections import deque
class SlidingWindowMemory:
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.buffer: deque[Message] = deque(maxlen=window_size * 2)
def add(self, role: str, content: str) -> None:
self.buffer.append(Message(role=role, content=content))
while len(self.buffer) > self.window_size * 2:
self.buffer.popleft()
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.buffer]
def get_recent(self, k: int = 1) -> list[dict]:
recent = list(self.buffer)[-k * 2:]
return [{"role": m.role, "content": m.content} for m in recent]
def size(self) -> int:
return len(self.buffer)
window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
window_memory.add("user", f"第{i+1}个问题")
window_memory.add("assistant", f"第{i+1}个回答")
print(f"窗口大小: {window_memory.size()}")
print(f"最近2轮: {window_memory.get_recent(k=2)}")
适用场景:长对话场景,只关心近期上下文,如代码调试助手。
模式3:摘要压缩记忆(SummaryCompressedMemory)
用LLM将历史对话压缩为摘要,保留关键信息的同时大幅减少Token占用。这是LangGraph记忆管理的核心思路之一。
from openai import OpenAI
class SummaryCompressedMemory:
def __init__(self, api_key: str, model: str = "gpt-4o-mini",
max_raw_messages: int = 10):
self.client = OpenAI(api_key=api_key)
self.model = model
self.max_raw_messages = max_raw_messages
self.summary: str = ""
self.recent_messages: list[Message] = []
def add(self, role: str, content: str) -> None:
self.recent_messages.append(Message(role=role, content=content))
if len(self.recent_messages) > self.max_raw_messages:
self._compress()
def _compress(self) -> None:
conversation_text = "\n".join(
f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
)
prompt = (
f"请将以下对话历史压缩为一段简洁的摘要,保留所有关键信息、"
f"用户偏好和重要决策:\n\n{conversation_text}\n\n"
f"当前已有摘要:{self.summary}\n\n请输出合并后的新摘要:"
)
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
)
self.summary = response.choices[0].message.content
self.recent_messages = self.recent_messages[-2:]
def get_context(self) -> list[dict]:
context = []
if self.summary:
context.append({
"role": "system",
"content": f"对话历史摘要:{self.summary}",
})
context.extend(
{"role": m.role, "content": m.content}
for m in self.recent_messages
)
return context
summary_memory = SummaryCompressedMemory(
api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
summary_memory.add("user", f"我想了解Python的{i+1}号特性")
summary_memory.add("assistant", f"Python的{i+1}号特性是...")
print(f"上下文条目数: {len(summary_memory.get_context())}")
适用场景:多轮深度对话、咨询类Agent,需要保留长期语义。
模式4:向量语义记忆(VectorSemanticMemory)
将记忆向量化存储,通过语义相似度检索相关记忆。这是向量数据库记忆的核心实现。
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class MemoryItem:
content: str
embedding: np.ndarray
timestamp: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
class VectorSemanticMemory:
def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
self.embedding_dim = embedding_dim
self.top_k = top_k
self.memories: list[MemoryItem] = []
def add(self, content: str, embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.memories.append(MemoryItem(
content=content,
embedding=embedding,
metadata=metadata or {},
))
def search(self, query_embedding: np.ndarray,
top_k: int | None = None) -> list[dict]:
k = top_k or self.top_k
if not self.memories:
return []
scores = []
for mem in self.memories:
sim = float(np.dot(query_embedding, mem.embedding) /
(np.linalg.norm(query_embedding) *
np.linalg.norm(mem.embedding) + 1e-8))
scores.append((sim, mem))
scores.sort(key=lambda x: x[0], reverse=True)
return [
{
"content": mem.content,
"score": score,
"timestamp": mem.timestamp.isoformat(),
"metadata": mem.metadata,
}
for score, mem in scores[:k]
]
def delete_old(self, before: datetime) -> int:
original_len = len(self.memories)
self.memories = [m for m in self.memories if m.timestamp >= before]
return original_len - len(self.memories)
vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
fake_embedding = np.random.randn(128)
fake_embedding /= np.linalg.norm(fake_embedding)
vector_memory.add(
content=f"用户偏好记录{i+1}:喜欢Python和Rust",
embedding=fake_embedding,
metadata={"source": "chat", "turn": i},
)
query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
print(f"[{r['score']:.4f}] {r['content']}")
适用场景:RAG增强的Agent、个性化推荐、跨会话知识检索。生产环境建议用Chroma或Milvus替换内存存储。
模式5:情景记忆(EpisodicMemory)
记录Agent经历的特定事件,支持按时间和语义双重检索。
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class EmotionTag(Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
@dataclass
class Episode:
event: str
context: str
timestamp: datetime = field(default_factory=datetime.now)
emotion: EmotionTag = EmotionTag.NEUTRAL
importance: float = 0.5
embedding: np.ndarray | None = None
class EpisodicMemory:
def __init__(self, max_episodes: int = 1000):
self.episodes: list[Episode] = []
self.max_episodes = max_episodes
def record(self, event: str, context: str,
emotion: EmotionTag = EmotionTag.NEUTRAL,
importance: float = 0.5,
embedding: np.ndarray | None = None) -> None:
self.episodes.append(Episode(
event=event, context=context, emotion=emotion,
importance=importance, embedding=embedding,
))
if len(self.episodes) > self.max_episodes:
self._evict()
def _evict(self) -> None:
self.episodes.sort(key=lambda e: e.importance, reverse=True)
self.episodes = self.episodes[:self.max_episodes]
def recall_by_time(self, start: datetime,
end: datetime) -> list[Episode]:
return [
ep for ep in self.episodes
if start <= ep.timestamp <= end
]
def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
return [ep for ep in self.episodes if ep.importance >= threshold]
def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
return [ep for ep in self.episodes if ep.emotion == emotion]
def get_recent(self, k: int = 5) -> list[Episode]:
return self.episodes[-k:]
episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
event="用户反馈API响应慢",
context="用户在高峰期调用/v2/predict接口",
emotion=EmotionTag.NEGATIVE,
importance=0.9,
)
episodic_mem.record(
event="用户完成首次部署",
context="使用Docker Compose部署成功",
emotion=EmotionTag.POSITIVE,
importance=0.7,
)
important = episodic_mem.recall_by_importance(0.8)
print(f"重要事件数: {len(important)}")
for ep in important:
print(f" [{ep.emotion.value}] {ep.event}")
适用场景:客服Agent记录用户投诉、运维Agent记录故障事件。
模式6:知识图谱记忆(KnowledgeGraphMemory)
用图谱结构存储实体和关系,支持多跳推理。这是Agent记忆架构中最强大的模式。
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class Entity:
name: str
entity_type: str
properties: dict = field(default_factory=dict)
@dataclass
class Relation:
source: str
target: str
relation_type: str
properties: dict = field(default_factory=dict)
class KnowledgeGraphMemory:
def __init__(self):
self.entities: dict[str, Entity] = {}
self.relations: list[Relation] = []
self._adjacency: dict[str, list[Relation]] = defaultdict(list)
def add_entity(self, name: str, entity_type: str,
properties: dict | None = None) -> Entity:
entity = Entity(name=name, entity_type=entity_type,
properties=properties or {})
self.entities[name] = entity
return entity
def add_relation(self, source: str, target: str,
relation_type: str,
properties: dict | None = None) -> Relation:
relation = Relation(source=source, target=target,
relation_type=relation_type,
properties=properties or {})
self.relations.append(relation)
self._adjacency[source].append(relation)
self._adjacency[target].append(relation)
return relation
def get_entity(self, name: str) -> Entity | None:
return self.entities.get(name)
def get_relations_of(self, name: str) -> list[Relation]:
return self._adjacency.get(name, [])
def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
visited = {start}
current_level = {start}
for _ in range(hops):
next_level = set()
for node in current_level:
for rel in self._adjacency.get(node, []):
neighbor = rel.target if rel.source == node else rel.source
if neighbor not in visited:
next_level.add(neighbor)
visited.add(neighbor)
current_level = next_level
return visited
def to_context_string(self, entity_name: str) -> str:
entity = self.get_entity(entity_name)
if not entity:
return ""
lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
for rel in self.get_relations_of(entity_name):
other = rel.target if rel.source == entity_name else rel.source
lines.append(
f" - {rel.relation_type} -> {other} {rel.properties}"
)
return "\n".join(lines)
kg = KnowledgeGraphMemory()
kg.add_entity("张三", "用户", {"偏好语言": "Python", "级别": "高级"})
kg.add_entity("FastAPI", "框架", {"类型": "Web", "版本": "0.115"})
kg.add_entity("Docker", "工具", {"类型": "容器化"})
kg.add_relation("张三", "FastAPI", "使用", {"频率": "每天"})
kg.add_relation("张三", "Docker", "使用", {"频率": "每周"})
kg.add_relation("FastAPI", "Docker", "部署方式")
print(kg.to_context_string("张三"))
print(f"2跳关联: {kg.multi_hop_query('张三', hops=2)}")
适用场景:需要多跳推理的复杂Agent,如企业知识助手、医疗诊断Agent。
模式7:混合层级记忆(HybridHierarchicalMemory)
融合上述所有模式,按层级组织记忆。这是生产级Agent记忆的终极方案。
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class MemoryConfig:
short_term_window: int = 10
summary_threshold: int = 8
vector_top_k: int = 5
kg_max_hops: int = 2
importance_threshold: float = 0.7
class HybridHierarchicalMemory:
def __init__(self, config: MemoryConfig | None = None):
self.config = config or MemoryConfig()
self.short_term: SlidingWindowMemory = SlidingWindowMemory(
window_size=self.config.short_term_window
)
self.summary_cache: str = ""
self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
top_k=self.config.vector_top_k
)
self.episodic_mem: EpisodicMemory = EpisodicMemory()
self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()
def add_message(self, role: str, content: str,
importance: float = 0.5) -> None:
self.short_term.add(role, content)
if importance >= self.config.importance_threshold:
self.episodic_mem.record(
event=content, context=f"role={role}",
importance=importance,
)
def add_semantic_memory(self, content: str,
embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.semantic_memories.add(content, embedding, metadata)
def add_knowledge(self, entity_name: str, entity_type: str,
properties: dict | None = None) -> None:
self.knowledge_graph.add_entity(
entity_name, entity_type, properties
)
def add_knowledge_relation(self, source: str, target: str,
relation_type: str) -> None:
self.knowledge_graph.add_relation(source, target, relation_type)
def retrieve(self, query: str | None = None,
query_embedding: np.ndarray | None = None) -> list[dict]:
context_parts = []
short_term_ctx = self.short_term.get_context()
if short_term_ctx:
context_parts.append({
"layer": "short_term",
"content": short_term_ctx,
})
if self.summary_cache:
context_parts.append({
"layer": "summary",
"content": self.summary_cache,
})
if query_embedding is not None:
semantic_results = self.semantic_memories.search(query_embedding)
if semantic_results:
context_parts.append({
"layer": "semantic",
"content": semantic_results,
})
important_episodes = self.episodic_mem.recall_by_importance(
self.config.importance_threshold
)
if important_episodes:
context_parts.append({
"layer": "episodic",
"content": [
{"event": ep.event, "importance": ep.importance}
for ep in important_episodes[-5:]
],
})
return context_parts
def get_full_context(self, query_embedding: np.ndarray | None = None,
focus_entity: str | None = None) -> list[dict]:
context = self.retrieve(query_embedding=query_embedding)
if focus_entity:
kg_context = self.knowledge_graph.to_context_string(focus_entity)
if kg_context:
context.append({
"layer": "knowledge_graph",
"content": kg_context,
})
return context
hybrid = HybridHierarchicalMemory(MemoryConfig(
short_term_window=5, summary_threshold=6,
vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "帮我用FastAPI搭一个RAG服务", importance=0.8)
hybrid.add_message("assistant", "好的,我来帮你设计架构", importance=0.3)
hybrid.add_knowledge("FastAPI", "框架", {"异步": True})
hybrid.add_knowledge("RAG", "架构", {"类型": "检索增强生成"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "实现框架")
ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
print(f"[{part['layer']}] {str(part['content'])[:100]}")
适用场景:企业级AI助手、需要全链路记忆的复杂Agent系统。
避坑指南:5个常见陷阱
陷阱1:无限制地存储对话历史
❌ 错误做法:
class BadMemory:
def __init__(self):
self.history = []
def add(self, msg: str):
self.history.append(msg)
✅ 正确做法:
class GoodMemory:
def __init__(self, max_messages: int = 50):
self.history = []
self.max_messages = max_messages
def add(self, msg: str):
self.history.append(msg)
if len(self.history) > self.max_messages:
self._compress_old()
def _compress_old(self):
old = self.history[:len(self.history) // 2]
self.history = self.history[len(self.history) // 2:]
陷阱2:向量检索不做相似度阈值过滤
❌ 错误做法:
results = vector_store.similarity_search(query, k=5)
for r in results:
context += r.page_content
✅ 正确做法:
SIMILARITY_THRESHOLD = 0.75
results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
context += doc.page_content
陷阱3:记忆写入不做去重
❌ 错误做法:
def save_memory(content: str):
db.insert({"content": content})
✅ 正确做法:
import hashlib
def save_memory(content: str, metadata: dict | None = None):
content_hash = hashlib.md5(content.encode()).hexdigest()
existing = db.find_one({"content_hash": content_hash})
if existing:
db.update({"content_hash": content_hash},
{"$set": {"updated_at": datetime.now()}})
else:
db.insert({
"content": content,
"content_hash": content_hash,
"metadata": metadata or {},
})
陷阱4:忽略记忆的时间衰减
❌ 错误做法:
all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)
✅ 正确做法:
from datetime import datetime, timedelta
def get_memories_with_decay(half_life_days: float = 30.0):
now = datetime.now()
memories = db.get_all()
scored = []
for m in memories:
age_days = (now - m["created_at"]).days
decay = 0.5 ** (age_days / half_life_days)
scored.append((m, decay * m.get("importance", 0.5)))
scored.sort(key=lambda x: x[1], reverse=True)
return [m for m, s in scored[:10]]
陷阱5:摘要压缩丢失关键细节
❌ 错误做法:
summary_prompt = "总结以下对话:" + conversation_text
✅ 正确做法:
summary_prompt = (
"请将以下对话历史压缩为摘要,必须保留:\n"
"1. 用户明确表达的偏好和需求\n"
"2. 已做出的重要决策和结论\n"
"3. 涉及的具体数值、名称、日期\n"
"4. 未解决或待跟进的问题\n\n"
f"对话内容:\n{conversation_text}"
)
报错排查:10个常见错误
| # | 错误信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | Token limit exceeded |
对话历史+系统提示超出模型上下文窗口 | 使用滑动窗口或摘要压缩记忆 |
| 2 | Embedding dimension mismatch |
查询向量与存储向量维度不一致 | 统一使用同一Embedding模型 |
| 3 | Rate limit hit on vector DB |
高频检索触发向量数据库限流 | 批量查询+本地缓存 |
| 4 | Memory retrieval returns empty |
向量索引未构建或数据未写入 | 检查写入是否commit,索引是否refresh |
| 5 | Context window too short for summary |
摘要本身过长挤占了对话空间 | 限制摘要长度,分层压缩 |
| 6 | Knowledge graph cycle detected |
实体关系形成环路 | 添加关系时检测环路并拒绝 |
| 7 | Stale memory causing wrong answers |
记忆未更新,使用了过期信息 | 实现TTL机制和版本号管理 |
| 8 | Concurrent write conflict |
多Agent实例同时写入记忆 | 使用乐观锁或分布式锁 |
| 9 | Embedding model timeout |
大批量文本Embedding超时 | 分批处理,每批不超过100条 |
| 10 | Memory leak in long-running agent |
长时间运行的Agent内存持续增长 | 定期清理低重要性记忆,设置上限 |
进阶优化:3个关键技巧
1. 记忆分级存储策略
from enum import Enum
from datetime import datetime, timedelta
class MemoryTier(Enum):
HOT = "hot"
WARM = "warm"
COLD = "cold"
class TieredMemoryStorage:
def __init__(self):
self.hot: list[dict] = []
self.warm: list[dict] = []
self.cold: list[dict] = []
def add(self, memory: dict) -> None:
memory["tier"] = MemoryTier.HOT.value
memory["access_count"] = 0
memory["created_at"] = datetime.now()
self.hot.append(memory)
def access(self, index: int) -> dict | None:
for tier in [self.hot, self.warm, self.cold]:
for mem in tier:
if mem.get("index") == index:
mem["access_count"] += 1
mem["last_accessed"] = datetime.now()
return mem
return None
def rebalance(self) -> None:
now = datetime.now()
for mem in self.hot[:]:
if (now - mem["last_accessed"]) > timedelta(hours=1):
self.hot.remove(mem)
mem["tier"] = MemoryTier.WARM.value
self.warm.append(mem)
for mem in self.warm[:]:
if (now - mem["last_accessed"]) > timedelta(days=7):
self.warm.remove(mem)
mem["tier"] = MemoryTier.COLD.value
self.cold.append(mem)
2. 异步记忆写入
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncMemoryWriter:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.pending: asyncio.Queue = asyncio.Queue()
async def write(self, memory: dict) -> None:
await self.pending.put(memory)
async def flush(self) -> int:
count = 0
batch = []
while not self.pending.empty():
mem = await self.pending.get()
batch.append(mem)
count += 1
if batch:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self.executor, self._batch_write, batch
)
return count
@staticmethod
def _batch_write(batch: list[dict]) -> None:
for mem in batch:
db.insert(mem)
async def start_periodic_flush(self, interval: float = 5.0):
while True:
await asyncio.sleep(interval)
await self.flush()
3. 记忆质量评分与自动淘汰
from datetime import datetime, timedelta
class MemoryQualityScorer:
def __init__(self, decay_rate: float = 0.95,
min_score: float = 0.1):
self.decay_rate = decay_rate
self.min_score = min_score
def score(self, memory: dict) -> float:
recency = self._recency_score(memory)
frequency = self._frequency_score(memory)
importance = memory.get("importance", 0.5)
relevance = memory.get("relevance", 0.5)
return (0.3 * recency + 0.2 * frequency +
0.3 * importance + 0.2 * relevance)
def _recency_score(self, memory: dict) -> float:
age_hours = (datetime.now() - memory.get(
"created_at", datetime.now()
)).total_seconds() / 3600
return self.decay_rate ** age_hours
def _frequency_score(self, memory: dict) -> float:
count = memory.get("access_count", 0)
return min(1.0, count / 10.0)
def should_evict(self, memory: dict) -> bool:
return self.score(memory) < self.min_score
对比分析:7种记忆模式全面对比
| 维度 | 对话缓冲 | 滑动窗口 | 摘要压缩 | 向量语义 | 情景记忆 | 知识图谱 | 混合层级 |
|---|---|---|---|---|---|---|---|
| 实现复杂度 | ★☆☆ | ★☆☆ | ★★☆ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| Token效率 | ★☆☆ | ★★☆ | ★★★ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| 检索精度 | ★★☆ | ★★☆ | ★★☆ | ★★★★ | ★★★ | ★★★★ | ★★★★★ |
| 多跳推理 | ✗ | ✗ | ✗ | △ | ✗ | ✓ | ✓ |
| 跨会话 | ✗ | ✗ | △ | ✓ | ✓ | ✓ | ✓ |
| 写入延迟 | ~1ms | ~1ms | ~500ms | ~50ms | ~10ms | ~10ms | ~100ms |
| 存储成本 | 低 | 低 | 低 | 中 | 中 | 中 | 高 |
| 适用规模 | <20轮 | <50轮 | <200轮 | 10K+条 | 1K+条 | 10K+实体 | 无限制 |
| 典型场景 | 简单问答 | 代码调试 | 咨询对话 | RAG增强 | 事件追踪 | 知识推理 | 企业助手 |
★越多表示该维度表现越好;✓支持 △部分支持 ✗不支持
总结展望
AI Agent记忆系统正在从"锦上添花"变为"不可或缺"。2026年的趋势:
- 原生记忆支持:LangGraph Memory、MemGPT等框架将记忆作为一等公民
- 多模态记忆:不仅记住文本,还记住图像、音频、视频上下文
- 联邦记忆:多Agent共享记忆池,同时保护隐私边界
- 自适应压缩:根据查询意图动态决定记忆压缩粒度
- 记忆审计:可追溯的记忆写入和召回日志,满足合规要求
选择记忆方案的原则:从简单开始,按需升级。先用滑动窗口跑通流程,遇到Token瓶颈加摘要压缩,需要跨会话就上向量数据库,多跳推理再引入知识图谱。别一上来就搞混合层级——那是最强大的方案,也是最复杂的。
在线工具推荐
本站提供浏览器本地工具,免注册即可试用 →