Python AI Agent記憶系統:7種生產級長期記憶方案

AI与大数据

你的AI Agent為什麼總是「失憶」?

你花了3天調通的Agent,上線第一天使用者就投訴:「我昨天告訴你的偏好呢?怎麼又忘了?」這不是個例——AI Agent記憶系統是2026年生產落地的頭號難題。大模型本身沒有持久記憶,每次對話都是一張白紙。短期記憶撐不過一個工作階段,長期記憶又面臨檢索慢、儲存貴、一致性差三座大山。

更扎心的是:很多團隊用LangGraph搭了個Agent,把對話歷史往串列裡一塞就以為搞定了記憶。結果Token爆炸、成本飆升、關鍵資訊被截斷,Agent越聊越「傻」。Python Agent長期記憶不是加個資料庫就完事的,它需要一套完整的架構設計。


核心概念速查

概念 說明 典型實作
短期記憶 當前工作階段內的上下文資訊 對話歷史串列、滑動視窗
長期記憶 跨工作階段持久化的知識和偏好 向量資料庫、關聯式資料庫
情景記憶 對特定事件和經歷的回憶 時間索引+向量檢索
語義記憶 對概念和知識的結構化理解 知識圖譜、本體庫
工作記憶 當前推理步驟中的臨時資訊 Scratchpad、ReAct觀察
向量檢索 基於語義相似度的記憶召回 Embedding+FAISS/Chroma
記憶壓縮 將冗長歷史壓縮為摘要 LLM摘要、關鍵資訊提取

問題分析:AI Agent記憶系統的5大挑戰

# 挑戰 具體表現 影響
1 Token視窗溢出 對話歷史超過模型上下文長度 關鍵資訊被截斷,Agent「失憶」
2 檢索精度不足 向量檢索回傳無關記憶 Agent基於錯誤資訊做決策
3 記憶一致性衝突 新舊記憶矛盾,無法判斷誰對 輸出自相矛盾,使用者信任崩塌
4 冷啟動問題 新使用者無歷史記憶可用 個人化體驗差,留存率低
5 成本與延遲權衡 全量記憶檢索慢且貴 回應逾時或API帳單爆炸

這5個問題環環相扣:為了解決Token溢出你壓縮記憶,壓縮導致資訊遺失,遺失又加劇檢索精度問題。生產級Agent記憶架構必須系統性地解決這些問題,而不是頭痛醫頭。


分步實作:7種記憶實作模式

模式1:對話緩衝記憶(ConversationBufferMemory)

最簡單的記憶模式——把所有對話歷史原樣儲存。適合短對話場景。

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class Message:
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)


class ConversationBufferMemory:
    def __init__(self, max_tokens: int = 4000):
        self.messages: list[Message] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str) -> None:
        self.messages.append(Message(role=role, content=content))

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.messages]

    def estimate_tokens(self) -> int:
        return sum(len(m.content) // 4 for m in self.messages)

    def is_overflow(self) -> bool:
        return self.estimate_tokens() > self.max_tokens

    def clear(self) -> None:
        self.messages.clear()


memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "我喜歡Python,請用Python回答")
memory.add("assistant", "好的,我會用Python來回答你的問題")
memory.add("user", "幫我寫一個快排演算法")

print(memory.get_context())
print(f"Token估算: {memory.estimate_tokens()}, 溢出: {memory.is_overflow()}")

適用場景:客服機器人、簡單問答,對話輪次 < 20。


模式2:滑動視窗記憶(SlidingWindowMemory)

只保留最近K輪對話,自動丟棄更早的歷史。Token可控,但會遺失早期資訊。

from collections import deque


class SlidingWindowMemory:
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.buffer: deque[Message] = deque(maxlen=window_size * 2)

    def add(self, role: str, content: str) -> None:
        self.buffer.append(Message(role=role, content=content))
        while len(self.buffer) > self.window_size * 2:
            self.buffer.popleft()

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.buffer]

    def get_recent(self, k: int = 1) -> list[dict]:
        recent = list(self.buffer)[-k * 2:]
        return [{"role": m.role, "content": m.content} for m in recent]

    def size(self) -> int:
        return len(self.buffer)


window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
    window_memory.add("user", f"第{i+1}個問題")
    window_memory.add("assistant", f"第{i+1}個回答")

print(f"視窗大小: {window_memory.size()}")
print(f"最近2輪: {window_memory.get_recent(k=2)}")

適用場景:長對話場景,只關心近期上下文,如程式除錯助手。


模式3:摘要壓縮記憶(SummaryCompressedMemory)

用LLM將歷史對話壓縮為摘要,保留關鍵資訊的同時大幅減少Token佔用。這是LangGraph記憶管理的核心思路之一。

from openai import OpenAI


class SummaryCompressedMemory:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini",
                 max_raw_messages: int = 10):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.max_raw_messages = max_raw_messages
        self.summary: str = ""
        self.recent_messages: list[Message] = []

    def add(self, role: str, content: str) -> None:
        self.recent_messages.append(Message(role=role, content=content))
        if len(self.recent_messages) > self.max_raw_messages:
            self._compress()

    def _compress(self) -> None:
        conversation_text = "\n".join(
            f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
        )
        prompt = (
            f"請將以下對話歷史壓縮為一段簡潔的摘要,保留所有關鍵資訊、"
            f"使用者偏好和重要決策:\n\n{conversation_text}\n\n"
            f"當前已有摘要:{self.summary}\n\n請輸出合併後的新摘要:"
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        self.summary = response.choices[0].message.content
        self.recent_messages = self.recent_messages[-2:]

    def get_context(self) -> list[dict]:
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"對話歷史摘要:{self.summary}",
            })
        context.extend(
            {"role": m.role, "content": m.content}
            for m in self.recent_messages
        )
        return context


summary_memory = SummaryCompressedMemory(
    api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
    summary_memory.add("user", f"我想了解Python的{i+1}號特性")
    summary_memory.add("assistant", f"Python的{i+1}號特性是...")

print(f"上下文條目數: {len(summary_memory.get_context())}")

適用場景:多輪深度對話、諮詢類Agent,需要保留長期語義。


模式4:向量語義記憶(VectorSemanticMemory)

將記憶向量化儲存,透過語義相似度檢索相關記憶。這是向量資料庫記憶的核心實作。

import numpy as np
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class MemoryItem:
    content: str
    embedding: np.ndarray
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)


class VectorSemanticMemory:
    def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
        self.embedding_dim = embedding_dim
        self.top_k = top_k
        self.memories: list[MemoryItem] = []

    def add(self, content: str, embedding: np.ndarray,
            metadata: dict | None = None) -> None:
        self.memories.append(MemoryItem(
            content=content,
            embedding=embedding,
            metadata=metadata or {},
        ))

    def search(self, query_embedding: np.ndarray,
               top_k: int | None = None) -> list[dict]:
        k = top_k or self.top_k
        if not self.memories:
            return []
        scores = []
        for mem in self.memories:
            sim = float(np.dot(query_embedding, mem.embedding) /
                        (np.linalg.norm(query_embedding) *
                         np.linalg.norm(mem.embedding) + 1e-8))
            scores.append((sim, mem))
        scores.sort(key=lambda x: x[0], reverse=True)
        return [
            {
                "content": mem.content,
                "score": score,
                "timestamp": mem.timestamp.isoformat(),
                "metadata": mem.metadata,
            }
            for score, mem in scores[:k]
        ]

    def delete_old(self, before: datetime) -> int:
        original_len = len(self.memories)
        self.memories = [m for m in self.memories if m.timestamp >= before]
        return original_len - len(self.memories)


vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
    fake_embedding = np.random.randn(128)
    fake_embedding /= np.linalg.norm(fake_embedding)
    vector_memory.add(
        content=f"使用者偏好記錄{i+1}:喜歡Python和Rust",
        embedding=fake_embedding,
        metadata={"source": "chat", "turn": i},
    )

query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
    print(f"[{r['score']:.4f}] {r['content']}")

適用場景:RAG增強的Agent、個人化推薦、跨工作階段知識檢索。生產環境建議用Chroma或Milvus替換記憶體儲存。


模式5:情景記憶(EpisodicMemory)

記錄Agent經歷的特定事件,支援按時間和語義雙重檢索。

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class EmotionTag(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


@dataclass
class Episode:
    event: str
    context: str
    timestamp: datetime = field(default_factory=datetime.now)
    emotion: EmotionTag = EmotionTag.NEUTRAL
    importance: float = 0.5
    embedding: np.ndarray | None = None


class EpisodicMemory:
    def __init__(self, max_episodes: int = 1000):
        self.episodes: list[Episode] = []
        self.max_episodes = max_episodes

    def record(self, event: str, context: str,
               emotion: EmotionTag = EmotionTag.NEUTRAL,
               importance: float = 0.5,
               embedding: np.ndarray | None = None) -> None:
        self.episodes.append(Episode(
            event=event, context=context, emotion=emotion,
            importance=importance, embedding=embedding,
        ))
        if len(self.episodes) > self.max_episodes:
            self._evict()

    def _evict(self) -> None:
        self.episodes.sort(key=lambda e: e.importance, reverse=True)
        self.episodes = self.episodes[:self.max_episodes]

    def recall_by_time(self, start: datetime,
                       end: datetime) -> list[Episode]:
        return [
            ep for ep in self.episodes
            if start <= ep.timestamp <= end
        ]

    def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
        return [ep for ep in self.episodes if ep.importance >= threshold]

    def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
        return [ep for ep in self.episodes if ep.emotion == emotion]

    def get_recent(self, k: int = 5) -> list[Episode]:
        return self.episodes[-k:]


episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
    event="使用者回饋API回應慢",
    context="使用者在高峰期呼叫/v2/predict介面",
    emotion=EmotionTag.NEGATIVE,
    importance=0.9,
)
episodic_mem.record(
    event="使用者完成首次部署",
    context="使用Docker Compose部署成功",
    emotion=EmotionTag.POSITIVE,
    importance=0.7,
)

important = episodic_mem.recall_by_importance(0.8)
print(f"重要事件數: {len(important)}")
for ep in important:
    print(f"  [{ep.emotion.value}] {ep.event}")

適用場景:客服Agent記錄使用者投訴、維運Agent記錄故障事件。


模式6:知識圖譜記憶(KnowledgeGraphMemory)

用圖譜結構儲存實體和關係,支援多跳推理。這是Agent記憶架構中最強大的模式。

from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class Entity:
    name: str
    entity_type: str
    properties: dict = field(default_factory=dict)


@dataclass
class Relation:
    source: str
    target: str
    relation_type: str
    properties: dict = field(default_factory=dict)


class KnowledgeGraphMemory:
    def __init__(self):
        self.entities: dict[str, Entity] = {}
        self.relations: list[Relation] = []
        self._adjacency: dict[str, list[Relation]] = defaultdict(list)

    def add_entity(self, name: str, entity_type: str,
                   properties: dict | None = None) -> Entity:
        entity = Entity(name=name, entity_type=entity_type,
                        properties=properties or {})
        self.entities[name] = entity
        return entity

    def add_relation(self, source: str, target: str,
                     relation_type: str,
                     properties: dict | None = None) -> Relation:
        relation = Relation(source=source, target=target,
                            relation_type=relation_type,
                            properties=properties or {})
        self.relations.append(relation)
        self._adjacency[source].append(relation)
        self._adjacency[target].append(relation)
        return relation

    def get_entity(self, name: str) -> Entity | None:
        return self.entities.get(name)

    def get_relations_of(self, name: str) -> list[Relation]:
        return self._adjacency.get(name, [])

    def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
        visited = {start}
        current_level = {start}
        for _ in range(hops):
            next_level = set()
            for node in current_level:
                for rel in self._adjacency.get(node, []):
                    neighbor = rel.target if rel.source == node else rel.source
                    if neighbor not in visited:
                        next_level.add(neighbor)
                        visited.add(neighbor)
            current_level = next_level
        return visited

    def to_context_string(self, entity_name: str) -> str:
        entity = self.get_entity(entity_name)
        if not entity:
            return ""
        lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
        for rel in self.get_relations_of(entity_name):
            other = rel.target if rel.source == entity_name else rel.source
            lines.append(
                f"  - {rel.relation_type} -> {other} {rel.properties}"
            )
        return "\n".join(lines)


kg = KnowledgeGraphMemory()
kg.add_entity("張三", "使用者", {"偏好語言": "Python", "級別": "進階"})
kg.add_entity("FastAPI", "框架", {"類型": "Web", "版本": "0.115"})
kg.add_entity("Docker", "工具", {"類型": "容器化"})
kg.add_relation("張三", "FastAPI", "使用", {"頻率": "每天"})
kg.add_relation("張三", "Docker", "使用", {"頻率": "每週"})
kg.add_relation("FastAPI", "Docker", "部署方式")

print(kg.to_context_string("張三"))
print(f"2跳關聯: {kg.multi_hop_query('張三', hops=2)}")

適用場景:需要多跳推理的複雜Agent,如企業知識助手、醫療診斷Agent。


模式7:混合層級記憶(HybridHierarchicalMemory)

融合上述所有模式,按層級組織記憶。這是生產級Agent記憶的終極方案。

from dataclasses import dataclass, field
from datetime import datetime, timedelta


@dataclass
class MemoryConfig:
    short_term_window: int = 10
    summary_threshold: int = 8
    vector_top_k: int = 5
    kg_max_hops: int = 2
    importance_threshold: float = 0.7


class HybridHierarchicalMemory:
    def __init__(self, config: MemoryConfig | None = None):
        self.config = config or MemoryConfig()
        self.short_term: SlidingWindowMemory = SlidingWindowMemory(
            window_size=self.config.short_term_window
        )
        self.summary_cache: str = ""
        self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
            top_k=self.config.vector_top_k
        )
        self.episodic_mem: EpisodicMemory = EpisodicMemory()
        self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()

    def add_message(self, role: str, content: str,
                    importance: float = 0.5) -> None:
        self.short_term.add(role, content)
        if importance >= self.config.importance_threshold:
            self.episodic_mem.record(
                event=content, context=f"role={role}",
                importance=importance,
            )

    def add_semantic_memory(self, content: str,
                            embedding: np.ndarray,
                            metadata: dict | None = None) -> None:
        self.semantic_memories.add(content, embedding, metadata)

    def add_knowledge(self, entity_name: str, entity_type: str,
                      properties: dict | None = None) -> None:
        self.knowledge_graph.add_entity(
            entity_name, entity_type, properties
        )

    def add_knowledge_relation(self, source: str, target: str,
                               relation_type: str) -> None:
        self.knowledge_graph.add_relation(source, target, relation_type)

    def retrieve(self, query: str | None = None,
                 query_embedding: np.ndarray | None = None) -> list[dict]:
        context_parts = []
        short_term_ctx = self.short_term.get_context()
        if short_term_ctx:
            context_parts.append({
                "layer": "short_term",
                "content": short_term_ctx,
            })
        if self.summary_cache:
            context_parts.append({
                "layer": "summary",
                "content": self.summary_cache,
            })
        if query_embedding is not None:
            semantic_results = self.semantic_memories.search(query_embedding)
            if semantic_results:
                context_parts.append({
                    "layer": "semantic",
                    "content": semantic_results,
                })
        important_episodes = self.episodic_mem.recall_by_importance(
            self.config.importance_threshold
        )
        if important_episodes:
            context_parts.append({
                "layer": "episodic",
                "content": [
                    {"event": ep.event, "importance": ep.importance}
                    for ep in important_episodes[-5:]
                ],
            })
        return context_parts

    def get_full_context(self, query_embedding: np.ndarray | None = None,
                         focus_entity: str | None = None) -> list[dict]:
        context = self.retrieve(query_embedding=query_embedding)
        if focus_entity:
            kg_context = self.knowledge_graph.to_context_string(focus_entity)
            if kg_context:
                context.append({
                    "layer": "knowledge_graph",
                    "content": kg_context,
                })
        return context


hybrid = HybridHierarchicalMemory(MemoryConfig(
    short_term_window=5, summary_threshold=6,
    vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "幫我用FastAPI搭一個RAG服務", importance=0.8)
hybrid.add_message("assistant", "好的,我來幫你設計架構", importance=0.3)
hybrid.add_knowledge("FastAPI", "框架", {"非同步": True})
hybrid.add_knowledge("RAG", "架構", {"類型": "檢索增強生成"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "實作框架")

ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
    print(f"[{part['layer']}] {str(part['content'])[:100]}")

適用場景:企業級AI助手、需要全鏈路記憶的複雜Agent系統。


避坑指南:5個常見陷阱

陷阱1:無限制地儲存對話歷史

錯誤做法

class BadMemory:
    def __init__(self):
        self.history = []

    def add(self, msg: str):
        self.history.append(msg)

正確做法

class GoodMemory:
    def __init__(self, max_messages: int = 50):
        self.history = []
        self.max_messages = max_messages

    def add(self, msg: str):
        self.history.append(msg)
        if len(self.history) > self.max_messages:
            self._compress_old()

    def _compress_old(self):
        old = self.history[:len(self.history) // 2]
        self.history = self.history[len(self.history) // 2:]

陷阱2:向量檢索不做相似度閾值過濾

錯誤做法

results = vector_store.similarity_search(query, k=5)
for r in results:
    context += r.page_content

正確做法

SIMILARITY_THRESHOLD = 0.75

results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
    context += doc.page_content

陷阱3:記憶寫入不做去重

錯誤做法

def save_memory(content: str):
    db.insert({"content": content})

正確做法

import hashlib


def save_memory(content: str, metadata: dict | None = None):
    content_hash = hashlib.md5(content.encode()).hexdigest()
    existing = db.find_one({"content_hash": content_hash})
    if existing:
        db.update({"content_hash": content_hash},
                  {"$set": {"updated_at": datetime.now()}})
    else:
        db.insert({
            "content": content,
            "content_hash": content_hash,
            "metadata": metadata or {},
        })

陷阱4:忽略記憶的時間衰減

錯誤做法

all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)

正確做法

from datetime import datetime, timedelta


def get_memories_with_decay(half_life_days: float = 30.0):
    now = datetime.now()
    memories = db.get_all()
    scored = []
    for m in memories:
        age_days = (now - m["created_at"]).days
        decay = 0.5 ** (age_days / half_life_days)
        scored.append((m, decay * m.get("importance", 0.5)))
    scored.sort(key=lambda x: x[1], reverse=True)
    return [m for m, s in scored[:10]]

陷阱5:摘要壓縮遺失關鍵細節

錯誤做法

summary_prompt = "總結以下對話:" + conversation_text

正確做法

summary_prompt = (
    "請將以下對話歷史壓縮為摘要,必須保留:\n"
    "1. 使用者明確表達的偏好和需求\n"
    "2. 已做出的重要決策和結論\n"
    "3. 涉及的具體數值、名稱、日期\n"
    "4. 未解決或待追蹤的問題\n\n"
    f"對話內容:\n{conversation_text}"
)

報錯排查:10個常見錯誤

# 錯誤資訊 原因 解決方案
1 Token limit exceeded 對話歷史+系統提示超出模型上下文視窗 使用滑動視窗或摘要壓縮記憶
2 Embedding dimension mismatch 查詢向量與儲存向量維度不一致 統一使用同一Embedding模型
3 Rate limit hit on vector DB 高頻檢索觸發向量資料庫限流 批次查詢+本機快取
4 Memory retrieval returns empty 向量索引未建構或資料未寫入 檢查寫入是否commit,索引是否refresh
5 Context window too short for summary 摘要本身過長佔用了對話空間 限制摘要長度,分層壓縮
6 Knowledge graph cycle detected 實體關係形成環路 新增關係時檢測環路並拒絕
7 Stale memory causing wrong answers 記憶未更新,使用了過期資訊 實作TTL機制和版本號管理
8 Concurrent write conflict 多Agent實例同時寫入記憶 使用樂觀鎖或分散式鎖
9 Embedding model timeout 大批次文字Embedding逾時 分批處理,每批不超過100條
10 Memory leak in long-running agent 長時間執行的Agent記憶體持續增長 定期清理低重要性記憶,設定上限

進階優化:3個關鍵技巧

1. 記憶分級儲存策略

from enum import Enum
from datetime import datetime, timedelta


class MemoryTier(Enum):
    HOT = "hot"
    WARM = "warm"
    COLD = "cold"


class TieredMemoryStorage:
    def __init__(self):
        self.hot: list[dict] = []
        self.warm: list[dict] = []
        self.cold: list[dict] = []

    def add(self, memory: dict) -> None:
        memory["tier"] = MemoryTier.HOT.value
        memory["access_count"] = 0
        memory["created_at"] = datetime.now()
        self.hot.append(memory)

    def access(self, index: int) -> dict | None:
        for tier in [self.hot, self.warm, self.cold]:
            for mem in tier:
                if mem.get("index") == index:
                    mem["access_count"] += 1
                    mem["last_accessed"] = datetime.now()
                    return mem
        return None

    def rebalance(self) -> None:
        now = datetime.now()
        for mem in self.hot[:]:
            if (now - mem["last_accessed"]) > timedelta(hours=1):
                self.hot.remove(mem)
                mem["tier"] = MemoryTier.WARM.value
                self.warm.append(mem)
        for mem in self.warm[:]:
            if (now - mem["last_accessed"]) > timedelta(days=7):
                self.warm.remove(mem)
                mem["tier"] = MemoryTier.COLD.value
                self.cold.append(mem)

2. 非同步記憶寫入

import asyncio
from concurrent.futures import ThreadPoolExecutor


class AsyncMemoryWriter:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.pending: asyncio.Queue = asyncio.Queue()

    async def write(self, memory: dict) -> None:
        await self.pending.put(memory)

    async def flush(self) -> int:
        count = 0
        batch = []
        while not self.pending.empty():
            mem = await self.pending.get()
            batch.append(mem)
            count += 1
        if batch:
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                self.executor, self._batch_write, batch
            )
        return count

    @staticmethod
    def _batch_write(batch: list[dict]) -> None:
        for mem in batch:
            db.insert(mem)

    async def start_periodic_flush(self, interval: float = 5.0):
        while True:
            await asyncio.sleep(interval)
            await self.flush()

3. 記憶品質評分與自動淘汰

from datetime import datetime, timedelta


class MemoryQualityScorer:
    def __init__(self, decay_rate: float = 0.95,
                 min_score: float = 0.1):
        self.decay_rate = decay_rate
        self.min_score = min_score

    def score(self, memory: dict) -> float:
        recency = self._recency_score(memory)
        frequency = self._frequency_score(memory)
        importance = memory.get("importance", 0.5)
        relevance = memory.get("relevance", 0.5)
        return (0.3 * recency + 0.2 * frequency +
                0.3 * importance + 0.2 * relevance)

    def _recency_score(self, memory: dict) -> float:
        age_hours = (datetime.now() - memory.get(
            "created_at", datetime.now()
        )).total_seconds() / 3600
        return self.decay_rate ** age_hours

    def _frequency_score(self, memory: dict) -> float:
        count = memory.get("access_count", 0)
        return min(1.0, count / 10.0)

    def should_evict(self, memory: dict) -> bool:
        return self.score(memory) < self.min_score

對比分析:7種記憶模式全面對比

維度 對話緩衝 滑動視窗 摘要壓縮 向量語義 情景記憶 知識圖譜 混合層級
實作複雜度 ★☆☆ ★☆☆ ★★☆ ★★★ ★★☆ ★★★ ★★★★
Token效率 ★☆☆ ★★☆ ★★★ ★★★ ★★☆ ★★★ ★★★★
檢索精度 ★★☆ ★★☆ ★★☆ ★★★★ ★★★ ★★★★ ★★★★★
多跳推理
跨工作階段
寫入延遲 ~1ms ~1ms ~500ms ~50ms ~10ms ~10ms ~100ms
儲存成本
適用規模 <20輪 <50輪 <200輪 10K+條 1K+條 10K+實體 無限制
典型場景 簡單問答 程式除錯 諮詢對話 RAG增強 事件追蹤 知識推理 企業助手

★越多表示該維度表現越好;✓支援 △部分支援 ✗不支援


總結展望

AI Agent記憶系統正在從「錦上添花」變為「不可或缺」。2026年的趨勢:

  1. 原生記憶支援:LangGraph Memory、MemGPT等框架將記憶作為一等公民
  2. 多模態記憶:不僅記住文字,還記住影像、音訊、影片上下文
  3. 聯邦記憶:多Agent共享記憶池,同時保護隱私邊界
  4. 自適應壓縮:根據查詢意圖動態決定記憶壓縮粒度
  5. 記憶稽核:可追溯的記憶寫入和召回日誌,滿足合規要求

選擇記憶方案的原則:從簡單開始,按需升級。先用滑動視窗跑通流程,遇到Token瓶頸加摘要壓縮,需要跨工作階段就上向量資料庫,多跳推理再引入知識圖譜。別一上來就搞混合層級——那是最強大的方案,也是最複雜的。


在線工具推薦

本站提供瀏覽器本地工具,免註冊即可試用 →

#AI Agent#记忆系统#LangGraph#向量数据库#生产部署#Python#2026#AI与大数据