Python AI Agent記憶系統:7種生產級長期記憶方案
你的AI Agent為什麼總是「失憶」?
你花了3天調通的Agent,上線第一天使用者就投訴:「我昨天告訴你的偏好呢?怎麼又忘了?」這不是個例——AI Agent記憶系統是2026年生產落地的頭號難題。大模型本身沒有持久記憶,每次對話都是一張白紙。短期記憶撐不過一個工作階段,長期記憶又面臨檢索慢、儲存貴、一致性差三座大山。
更扎心的是:很多團隊用LangGraph搭了個Agent,把對話歷史往串列裡一塞就以為搞定了記憶。結果Token爆炸、成本飆升、關鍵資訊被截斷,Agent越聊越「傻」。Python Agent長期記憶不是加個資料庫就完事的,它需要一套完整的架構設計。
核心概念速查
| 概念 | 說明 | 典型實作 |
|---|---|---|
| 短期記憶 | 當前工作階段內的上下文資訊 | 對話歷史串列、滑動視窗 |
| 長期記憶 | 跨工作階段持久化的知識和偏好 | 向量資料庫、關聯式資料庫 |
| 情景記憶 | 對特定事件和經歷的回憶 | 時間索引+向量檢索 |
| 語義記憶 | 對概念和知識的結構化理解 | 知識圖譜、本體庫 |
| 工作記憶 | 當前推理步驟中的臨時資訊 | Scratchpad、ReAct觀察 |
| 向量檢索 | 基於語義相似度的記憶召回 | Embedding+FAISS/Chroma |
| 記憶壓縮 | 將冗長歷史壓縮為摘要 | LLM摘要、關鍵資訊提取 |
問題分析:AI Agent記憶系統的5大挑戰
| # | 挑戰 | 具體表現 | 影響 |
|---|---|---|---|
| 1 | Token視窗溢出 | 對話歷史超過模型上下文長度 | 關鍵資訊被截斷,Agent「失憶」 |
| 2 | 檢索精度不足 | 向量檢索回傳無關記憶 | Agent基於錯誤資訊做決策 |
| 3 | 記憶一致性衝突 | 新舊記憶矛盾,無法判斷誰對 | 輸出自相矛盾,使用者信任崩塌 |
| 4 | 冷啟動問題 | 新使用者無歷史記憶可用 | 個人化體驗差,留存率低 |
| 5 | 成本與延遲權衡 | 全量記憶檢索慢且貴 | 回應逾時或API帳單爆炸 |
這5個問題環環相扣:為了解決Token溢出你壓縮記憶,壓縮導致資訊遺失,遺失又加劇檢索精度問題。生產級Agent記憶架構必須系統性地解決這些問題,而不是頭痛醫頭。
分步實作:7種記憶實作模式
模式1:對話緩衝記憶(ConversationBufferMemory)
最簡單的記憶模式——把所有對話歷史原樣儲存。適合短對話場景。
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Message:
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
class ConversationBufferMemory:
def __init__(self, max_tokens: int = 4000):
self.messages: list[Message] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str) -> None:
self.messages.append(Message(role=role, content=content))
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.messages]
def estimate_tokens(self) -> int:
return sum(len(m.content) // 4 for m in self.messages)
def is_overflow(self) -> bool:
return self.estimate_tokens() > self.max_tokens
def clear(self) -> None:
self.messages.clear()
memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "我喜歡Python,請用Python回答")
memory.add("assistant", "好的,我會用Python來回答你的問題")
memory.add("user", "幫我寫一個快排演算法")
print(memory.get_context())
print(f"Token估算: {memory.estimate_tokens()}, 溢出: {memory.is_overflow()}")
適用場景:客服機器人、簡單問答,對話輪次 < 20。
模式2:滑動視窗記憶(SlidingWindowMemory)
只保留最近K輪對話,自動丟棄更早的歷史。Token可控,但會遺失早期資訊。
from collections import deque
class SlidingWindowMemory:
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.buffer: deque[Message] = deque(maxlen=window_size * 2)
def add(self, role: str, content: str) -> None:
self.buffer.append(Message(role=role, content=content))
while len(self.buffer) > self.window_size * 2:
self.buffer.popleft()
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.buffer]
def get_recent(self, k: int = 1) -> list[dict]:
recent = list(self.buffer)[-k * 2:]
return [{"role": m.role, "content": m.content} for m in recent]
def size(self) -> int:
return len(self.buffer)
window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
window_memory.add("user", f"第{i+1}個問題")
window_memory.add("assistant", f"第{i+1}個回答")
print(f"視窗大小: {window_memory.size()}")
print(f"最近2輪: {window_memory.get_recent(k=2)}")
適用場景:長對話場景,只關心近期上下文,如程式除錯助手。
模式3:摘要壓縮記憶(SummaryCompressedMemory)
用LLM將歷史對話壓縮為摘要,保留關鍵資訊的同時大幅減少Token佔用。這是LangGraph記憶管理的核心思路之一。
from openai import OpenAI
class SummaryCompressedMemory:
def __init__(self, api_key: str, model: str = "gpt-4o-mini",
max_raw_messages: int = 10):
self.client = OpenAI(api_key=api_key)
self.model = model
self.max_raw_messages = max_raw_messages
self.summary: str = ""
self.recent_messages: list[Message] = []
def add(self, role: str, content: str) -> None:
self.recent_messages.append(Message(role=role, content=content))
if len(self.recent_messages) > self.max_raw_messages:
self._compress()
def _compress(self) -> None:
conversation_text = "\n".join(
f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
)
prompt = (
f"請將以下對話歷史壓縮為一段簡潔的摘要,保留所有關鍵資訊、"
f"使用者偏好和重要決策:\n\n{conversation_text}\n\n"
f"當前已有摘要:{self.summary}\n\n請輸出合併後的新摘要:"
)
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
)
self.summary = response.choices[0].message.content
self.recent_messages = self.recent_messages[-2:]
def get_context(self) -> list[dict]:
context = []
if self.summary:
context.append({
"role": "system",
"content": f"對話歷史摘要:{self.summary}",
})
context.extend(
{"role": m.role, "content": m.content}
for m in self.recent_messages
)
return context
summary_memory = SummaryCompressedMemory(
api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
summary_memory.add("user", f"我想了解Python的{i+1}號特性")
summary_memory.add("assistant", f"Python的{i+1}號特性是...")
print(f"上下文條目數: {len(summary_memory.get_context())}")
適用場景:多輪深度對話、諮詢類Agent,需要保留長期語義。
模式4:向量語義記憶(VectorSemanticMemory)
將記憶向量化儲存,透過語義相似度檢索相關記憶。這是向量資料庫記憶的核心實作。
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class MemoryItem:
content: str
embedding: np.ndarray
timestamp: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
class VectorSemanticMemory:
def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
self.embedding_dim = embedding_dim
self.top_k = top_k
self.memories: list[MemoryItem] = []
def add(self, content: str, embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.memories.append(MemoryItem(
content=content,
embedding=embedding,
metadata=metadata or {},
))
def search(self, query_embedding: np.ndarray,
top_k: int | None = None) -> list[dict]:
k = top_k or self.top_k
if not self.memories:
return []
scores = []
for mem in self.memories:
sim = float(np.dot(query_embedding, mem.embedding) /
(np.linalg.norm(query_embedding) *
np.linalg.norm(mem.embedding) + 1e-8))
scores.append((sim, mem))
scores.sort(key=lambda x: x[0], reverse=True)
return [
{
"content": mem.content,
"score": score,
"timestamp": mem.timestamp.isoformat(),
"metadata": mem.metadata,
}
for score, mem in scores[:k]
]
def delete_old(self, before: datetime) -> int:
original_len = len(self.memories)
self.memories = [m for m in self.memories if m.timestamp >= before]
return original_len - len(self.memories)
vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
fake_embedding = np.random.randn(128)
fake_embedding /= np.linalg.norm(fake_embedding)
vector_memory.add(
content=f"使用者偏好記錄{i+1}:喜歡Python和Rust",
embedding=fake_embedding,
metadata={"source": "chat", "turn": i},
)
query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
print(f"[{r['score']:.4f}] {r['content']}")
適用場景:RAG增強的Agent、個人化推薦、跨工作階段知識檢索。生產環境建議用Chroma或Milvus替換記憶體儲存。
模式5:情景記憶(EpisodicMemory)
記錄Agent經歷的特定事件,支援按時間和語義雙重檢索。
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class EmotionTag(Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
@dataclass
class Episode:
event: str
context: str
timestamp: datetime = field(default_factory=datetime.now)
emotion: EmotionTag = EmotionTag.NEUTRAL
importance: float = 0.5
embedding: np.ndarray | None = None
class EpisodicMemory:
def __init__(self, max_episodes: int = 1000):
self.episodes: list[Episode] = []
self.max_episodes = max_episodes
def record(self, event: str, context: str,
emotion: EmotionTag = EmotionTag.NEUTRAL,
importance: float = 0.5,
embedding: np.ndarray | None = None) -> None:
self.episodes.append(Episode(
event=event, context=context, emotion=emotion,
importance=importance, embedding=embedding,
))
if len(self.episodes) > self.max_episodes:
self._evict()
def _evict(self) -> None:
self.episodes.sort(key=lambda e: e.importance, reverse=True)
self.episodes = self.episodes[:self.max_episodes]
def recall_by_time(self, start: datetime,
end: datetime) -> list[Episode]:
return [
ep for ep in self.episodes
if start <= ep.timestamp <= end
]
def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
return [ep for ep in self.episodes if ep.importance >= threshold]
def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
return [ep for ep in self.episodes if ep.emotion == emotion]
def get_recent(self, k: int = 5) -> list[Episode]:
return self.episodes[-k:]
episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
event="使用者回饋API回應慢",
context="使用者在高峰期呼叫/v2/predict介面",
emotion=EmotionTag.NEGATIVE,
importance=0.9,
)
episodic_mem.record(
event="使用者完成首次部署",
context="使用Docker Compose部署成功",
emotion=EmotionTag.POSITIVE,
importance=0.7,
)
important = episodic_mem.recall_by_importance(0.8)
print(f"重要事件數: {len(important)}")
for ep in important:
print(f" [{ep.emotion.value}] {ep.event}")
適用場景:客服Agent記錄使用者投訴、維運Agent記錄故障事件。
模式6:知識圖譜記憶(KnowledgeGraphMemory)
用圖譜結構儲存實體和關係,支援多跳推理。這是Agent記憶架構中最強大的模式。
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class Entity:
name: str
entity_type: str
properties: dict = field(default_factory=dict)
@dataclass
class Relation:
source: str
target: str
relation_type: str
properties: dict = field(default_factory=dict)
class KnowledgeGraphMemory:
def __init__(self):
self.entities: dict[str, Entity] = {}
self.relations: list[Relation] = []
self._adjacency: dict[str, list[Relation]] = defaultdict(list)
def add_entity(self, name: str, entity_type: str,
properties: dict | None = None) -> Entity:
entity = Entity(name=name, entity_type=entity_type,
properties=properties or {})
self.entities[name] = entity
return entity
def add_relation(self, source: str, target: str,
relation_type: str,
properties: dict | None = None) -> Relation:
relation = Relation(source=source, target=target,
relation_type=relation_type,
properties=properties or {})
self.relations.append(relation)
self._adjacency[source].append(relation)
self._adjacency[target].append(relation)
return relation
def get_entity(self, name: str) -> Entity | None:
return self.entities.get(name)
def get_relations_of(self, name: str) -> list[Relation]:
return self._adjacency.get(name, [])
def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
visited = {start}
current_level = {start}
for _ in range(hops):
next_level = set()
for node in current_level:
for rel in self._adjacency.get(node, []):
neighbor = rel.target if rel.source == node else rel.source
if neighbor not in visited:
next_level.add(neighbor)
visited.add(neighbor)
current_level = next_level
return visited
def to_context_string(self, entity_name: str) -> str:
entity = self.get_entity(entity_name)
if not entity:
return ""
lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
for rel in self.get_relations_of(entity_name):
other = rel.target if rel.source == entity_name else rel.source
lines.append(
f" - {rel.relation_type} -> {other} {rel.properties}"
)
return "\n".join(lines)
kg = KnowledgeGraphMemory()
kg.add_entity("張三", "使用者", {"偏好語言": "Python", "級別": "進階"})
kg.add_entity("FastAPI", "框架", {"類型": "Web", "版本": "0.115"})
kg.add_entity("Docker", "工具", {"類型": "容器化"})
kg.add_relation("張三", "FastAPI", "使用", {"頻率": "每天"})
kg.add_relation("張三", "Docker", "使用", {"頻率": "每週"})
kg.add_relation("FastAPI", "Docker", "部署方式")
print(kg.to_context_string("張三"))
print(f"2跳關聯: {kg.multi_hop_query('張三', hops=2)}")
適用場景:需要多跳推理的複雜Agent,如企業知識助手、醫療診斷Agent。
模式7:混合層級記憶(HybridHierarchicalMemory)
融合上述所有模式,按層級組織記憶。這是生產級Agent記憶的終極方案。
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class MemoryConfig:
short_term_window: int = 10
summary_threshold: int = 8
vector_top_k: int = 5
kg_max_hops: int = 2
importance_threshold: float = 0.7
class HybridHierarchicalMemory:
def __init__(self, config: MemoryConfig | None = None):
self.config = config or MemoryConfig()
self.short_term: SlidingWindowMemory = SlidingWindowMemory(
window_size=self.config.short_term_window
)
self.summary_cache: str = ""
self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
top_k=self.config.vector_top_k
)
self.episodic_mem: EpisodicMemory = EpisodicMemory()
self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()
def add_message(self, role: str, content: str,
importance: float = 0.5) -> None:
self.short_term.add(role, content)
if importance >= self.config.importance_threshold:
self.episodic_mem.record(
event=content, context=f"role={role}",
importance=importance,
)
def add_semantic_memory(self, content: str,
embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.semantic_memories.add(content, embedding, metadata)
def add_knowledge(self, entity_name: str, entity_type: str,
properties: dict | None = None) -> None:
self.knowledge_graph.add_entity(
entity_name, entity_type, properties
)
def add_knowledge_relation(self, source: str, target: str,
relation_type: str) -> None:
self.knowledge_graph.add_relation(source, target, relation_type)
def retrieve(self, query: str | None = None,
query_embedding: np.ndarray | None = None) -> list[dict]:
context_parts = []
short_term_ctx = self.short_term.get_context()
if short_term_ctx:
context_parts.append({
"layer": "short_term",
"content": short_term_ctx,
})
if self.summary_cache:
context_parts.append({
"layer": "summary",
"content": self.summary_cache,
})
if query_embedding is not None:
semantic_results = self.semantic_memories.search(query_embedding)
if semantic_results:
context_parts.append({
"layer": "semantic",
"content": semantic_results,
})
important_episodes = self.episodic_mem.recall_by_importance(
self.config.importance_threshold
)
if important_episodes:
context_parts.append({
"layer": "episodic",
"content": [
{"event": ep.event, "importance": ep.importance}
for ep in important_episodes[-5:]
],
})
return context_parts
def get_full_context(self, query_embedding: np.ndarray | None = None,
focus_entity: str | None = None) -> list[dict]:
context = self.retrieve(query_embedding=query_embedding)
if focus_entity:
kg_context = self.knowledge_graph.to_context_string(focus_entity)
if kg_context:
context.append({
"layer": "knowledge_graph",
"content": kg_context,
})
return context
hybrid = HybridHierarchicalMemory(MemoryConfig(
short_term_window=5, summary_threshold=6,
vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "幫我用FastAPI搭一個RAG服務", importance=0.8)
hybrid.add_message("assistant", "好的,我來幫你設計架構", importance=0.3)
hybrid.add_knowledge("FastAPI", "框架", {"非同步": True})
hybrid.add_knowledge("RAG", "架構", {"類型": "檢索增強生成"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "實作框架")
ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
print(f"[{part['layer']}] {str(part['content'])[:100]}")
適用場景:企業級AI助手、需要全鏈路記憶的複雜Agent系統。
避坑指南:5個常見陷阱
陷阱1:無限制地儲存對話歷史
❌ 錯誤做法:
class BadMemory:
def __init__(self):
self.history = []
def add(self, msg: str):
self.history.append(msg)
✅ 正確做法:
class GoodMemory:
def __init__(self, max_messages: int = 50):
self.history = []
self.max_messages = max_messages
def add(self, msg: str):
self.history.append(msg)
if len(self.history) > self.max_messages:
self._compress_old()
def _compress_old(self):
old = self.history[:len(self.history) // 2]
self.history = self.history[len(self.history) // 2:]
陷阱2:向量檢索不做相似度閾值過濾
❌ 錯誤做法:
results = vector_store.similarity_search(query, k=5)
for r in results:
context += r.page_content
✅ 正確做法:
SIMILARITY_THRESHOLD = 0.75
results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
context += doc.page_content
陷阱3:記憶寫入不做去重
❌ 錯誤做法:
def save_memory(content: str):
db.insert({"content": content})
✅ 正確做法:
import hashlib
def save_memory(content: str, metadata: dict | None = None):
content_hash = hashlib.md5(content.encode()).hexdigest()
existing = db.find_one({"content_hash": content_hash})
if existing:
db.update({"content_hash": content_hash},
{"$set": {"updated_at": datetime.now()}})
else:
db.insert({
"content": content,
"content_hash": content_hash,
"metadata": metadata or {},
})
陷阱4:忽略記憶的時間衰減
❌ 錯誤做法:
all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)
✅ 正確做法:
from datetime import datetime, timedelta
def get_memories_with_decay(half_life_days: float = 30.0):
now = datetime.now()
memories = db.get_all()
scored = []
for m in memories:
age_days = (now - m["created_at"]).days
decay = 0.5 ** (age_days / half_life_days)
scored.append((m, decay * m.get("importance", 0.5)))
scored.sort(key=lambda x: x[1], reverse=True)
return [m for m, s in scored[:10]]
陷阱5:摘要壓縮遺失關鍵細節
❌ 錯誤做法:
summary_prompt = "總結以下對話:" + conversation_text
✅ 正確做法:
summary_prompt = (
"請將以下對話歷史壓縮為摘要,必須保留:\n"
"1. 使用者明確表達的偏好和需求\n"
"2. 已做出的重要決策和結論\n"
"3. 涉及的具體數值、名稱、日期\n"
"4. 未解決或待追蹤的問題\n\n"
f"對話內容:\n{conversation_text}"
)
報錯排查:10個常見錯誤
| # | 錯誤資訊 | 原因 | 解決方案 |
|---|---|---|---|
| 1 | Token limit exceeded |
對話歷史+系統提示超出模型上下文視窗 | 使用滑動視窗或摘要壓縮記憶 |
| 2 | Embedding dimension mismatch |
查詢向量與儲存向量維度不一致 | 統一使用同一Embedding模型 |
| 3 | Rate limit hit on vector DB |
高頻檢索觸發向量資料庫限流 | 批次查詢+本機快取 |
| 4 | Memory retrieval returns empty |
向量索引未建構或資料未寫入 | 檢查寫入是否commit,索引是否refresh |
| 5 | Context window too short for summary |
摘要本身過長佔用了對話空間 | 限制摘要長度,分層壓縮 |
| 6 | Knowledge graph cycle detected |
實體關係形成環路 | 新增關係時檢測環路並拒絕 |
| 7 | Stale memory causing wrong answers |
記憶未更新,使用了過期資訊 | 實作TTL機制和版本號管理 |
| 8 | Concurrent write conflict |
多Agent實例同時寫入記憶 | 使用樂觀鎖或分散式鎖 |
| 9 | Embedding model timeout |
大批次文字Embedding逾時 | 分批處理,每批不超過100條 |
| 10 | Memory leak in long-running agent |
長時間執行的Agent記憶體持續增長 | 定期清理低重要性記憶,設定上限 |
進階優化:3個關鍵技巧
1. 記憶分級儲存策略
from enum import Enum
from datetime import datetime, timedelta
class MemoryTier(Enum):
HOT = "hot"
WARM = "warm"
COLD = "cold"
class TieredMemoryStorage:
def __init__(self):
self.hot: list[dict] = []
self.warm: list[dict] = []
self.cold: list[dict] = []
def add(self, memory: dict) -> None:
memory["tier"] = MemoryTier.HOT.value
memory["access_count"] = 0
memory["created_at"] = datetime.now()
self.hot.append(memory)
def access(self, index: int) -> dict | None:
for tier in [self.hot, self.warm, self.cold]:
for mem in tier:
if mem.get("index") == index:
mem["access_count"] += 1
mem["last_accessed"] = datetime.now()
return mem
return None
def rebalance(self) -> None:
now = datetime.now()
for mem in self.hot[:]:
if (now - mem["last_accessed"]) > timedelta(hours=1):
self.hot.remove(mem)
mem["tier"] = MemoryTier.WARM.value
self.warm.append(mem)
for mem in self.warm[:]:
if (now - mem["last_accessed"]) > timedelta(days=7):
self.warm.remove(mem)
mem["tier"] = MemoryTier.COLD.value
self.cold.append(mem)
2. 非同步記憶寫入
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncMemoryWriter:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.pending: asyncio.Queue = asyncio.Queue()
async def write(self, memory: dict) -> None:
await self.pending.put(memory)
async def flush(self) -> int:
count = 0
batch = []
while not self.pending.empty():
mem = await self.pending.get()
batch.append(mem)
count += 1
if batch:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self.executor, self._batch_write, batch
)
return count
@staticmethod
def _batch_write(batch: list[dict]) -> None:
for mem in batch:
db.insert(mem)
async def start_periodic_flush(self, interval: float = 5.0):
while True:
await asyncio.sleep(interval)
await self.flush()
3. 記憶品質評分與自動淘汰
from datetime import datetime, timedelta
class MemoryQualityScorer:
def __init__(self, decay_rate: float = 0.95,
min_score: float = 0.1):
self.decay_rate = decay_rate
self.min_score = min_score
def score(self, memory: dict) -> float:
recency = self._recency_score(memory)
frequency = self._frequency_score(memory)
importance = memory.get("importance", 0.5)
relevance = memory.get("relevance", 0.5)
return (0.3 * recency + 0.2 * frequency +
0.3 * importance + 0.2 * relevance)
def _recency_score(self, memory: dict) -> float:
age_hours = (datetime.now() - memory.get(
"created_at", datetime.now()
)).total_seconds() / 3600
return self.decay_rate ** age_hours
def _frequency_score(self, memory: dict) -> float:
count = memory.get("access_count", 0)
return min(1.0, count / 10.0)
def should_evict(self, memory: dict) -> bool:
return self.score(memory) < self.min_score
對比分析:7種記憶模式全面對比
| 維度 | 對話緩衝 | 滑動視窗 | 摘要壓縮 | 向量語義 | 情景記憶 | 知識圖譜 | 混合層級 |
|---|---|---|---|---|---|---|---|
| 實作複雜度 | ★☆☆ | ★☆☆ | ★★☆ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| Token效率 | ★☆☆ | ★★☆ | ★★★ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| 檢索精度 | ★★☆ | ★★☆ | ★★☆ | ★★★★ | ★★★ | ★★★★ | ★★★★★ |
| 多跳推理 | ✗ | ✗ | ✗ | △ | ✗ | ✓ | ✓ |
| 跨工作階段 | ✗ | ✗ | △ | ✓ | ✓ | ✓ | ✓ |
| 寫入延遲 | ~1ms | ~1ms | ~500ms | ~50ms | ~10ms | ~10ms | ~100ms |
| 儲存成本 | 低 | 低 | 低 | 中 | 中 | 中 | 高 |
| 適用規模 | <20輪 | <50輪 | <200輪 | 10K+條 | 1K+條 | 10K+實體 | 無限制 |
| 典型場景 | 簡單問答 | 程式除錯 | 諮詢對話 | RAG增強 | 事件追蹤 | 知識推理 | 企業助手 |
★越多表示該維度表現越好;✓支援 △部分支援 ✗不支援
總結展望
AI Agent記憶系統正在從「錦上添花」變為「不可或缺」。2026年的趨勢:
- 原生記憶支援:LangGraph Memory、MemGPT等框架將記憶作為一等公民
- 多模態記憶:不僅記住文字,還記住影像、音訊、影片上下文
- 聯邦記憶:多Agent共享記憶池,同時保護隱私邊界
- 自適應壓縮:根據查詢意圖動態決定記憶壓縮粒度
- 記憶稽核:可追溯的記憶寫入和召回日誌,滿足合規要求
選擇記憶方案的原則:從簡單開始,按需升級。先用滑動視窗跑通流程,遇到Token瓶頸加摘要壓縮,需要跨工作階段就上向量資料庫,多跳推理再引入知識圖譜。別一上來就搞混合層級——那是最強大的方案,也是最複雜的。
在線工具推薦
本站提供瀏覽器本地工具,免註冊即可試用 →