Python AI Agent Memory System: 7 Production Patterns for Long-Term Memory

AI与大数据

Why Does Your AI Agent Keep "Forgetting"?

You spent three days fine-tuning your Agent, and on day one in production, users complain: "I told you my preferences yesterday — why did you forget again?" This is not an isolated case — AI Agent memory systems are the #1 challenge for production deployment in 2026. LLMs have no persistent memory by design; every conversation starts from a blank slate. Short-term memory doesn't survive a single session, while long-term memory faces three mountains: slow retrieval, expensive storage, and poor consistency.

The harsh truth: many teams build an Agent with LangGraph, stuff conversation history into a list, and call it "memory." The result? Token explosion, skyrocketing costs, critical information truncated, and the Agent getting dumber with every turn. Python Agent long-term memory isn't just about adding a database — it requires a complete architecture design.


Core Concepts at a Glance

Concept Description Typical Implementation
Short-term Memory Contextual information within the current session Conversation history list, sliding window
Long-term Memory Knowledge and preferences persisted across sessions Vector database, relational database
Episodic Memory Recollection of specific events and experiences Temporal index + vector retrieval
Semantic Memory Structured understanding of concepts and knowledge Knowledge graph, ontology
Working Memory Temporary information during current reasoning steps Scratchpad, ReAct observations
Vector Retrieval Memory recall based on semantic similarity Embedding + FAISS/Chroma
Memory Compression Compressing lengthy history into summaries LLM summarization, key information extraction

Problem Analysis: 5 Major Challenges of AI Agent Memory Systems

# Challenge Manifestation Impact
1 Token Window Overflow Conversation history exceeds model context length Critical information truncated, Agent "forgets"
2 Insufficient Retrieval Precision Vector search returns irrelevant memories Agent makes decisions based on wrong information
3 Memory Consistency Conflicts New and old memories contradict, no way to determine which is correct Self-contradictory output, user trust collapses
4 Cold Start Problem New users have no historical memory available Poor personalization, low retention
5 Cost vs. Latency Tradeoff Full-memory retrieval is slow and expensive Response timeouts or API bill explosions

These five problems are deeply interconnected: to solve token overflow you compress memory, compression causes information loss, and loss worsens retrieval precision. A production Agent memory architecture must address these issues systematically — not by treating symptoms in isolation.


Step-by-Step Implementation: 7 Memory Patterns

Pattern 1: ConversationBufferMemory

The simplest memory pattern — store all conversation history as-is. Suitable for short conversations.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class Message:
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)


class ConversationBufferMemory:
    def __init__(self, max_tokens: int = 4000):
        self.messages: list[Message] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str) -> None:
        self.messages.append(Message(role=role, content=content))

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.messages]

    def estimate_tokens(self) -> int:
        return sum(len(m.content) // 4 for m in self.messages)

    def is_overflow(self) -> bool:
        return self.estimate_tokens() > self.max_tokens

    def clear(self) -> None:
        self.messages.clear()


memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "I prefer Python, please answer in Python")
memory.add("assistant", "Sure, I'll use Python for all responses")
memory.add("user", "Write a quicksort algorithm for me")

print(memory.get_context())
print(f"Token estimate: {memory.estimate_tokens()}, overflow: {memory.is_overflow()}")

Best for: Customer service bots, simple Q&A, conversations with fewer than 20 turns.


Pattern 2: SlidingWindowMemory

Keep only the most recent K turns, automatically discarding earlier history. Token usage is predictable, but early context is lost.

from collections import deque


class SlidingWindowMemory:
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.buffer: deque[Message] = deque(maxlen=window_size * 2)

    def add(self, role: str, content: str) -> None:
        self.buffer.append(Message(role=role, content=content))
        while len(self.buffer) > self.window_size * 2:
            self.buffer.popleft()

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.buffer]

    def get_recent(self, k: int = 1) -> list[dict]:
        recent = list(self.buffer)[-k * 2:]
        return [{"role": m.role, "content": m.content} for m in recent]

    def size(self) -> int:
        return len(self.buffer)


window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
    window_memory.add("user", f"Question #{i+1}")
    window_memory.add("assistant", f"Answer #{i+1}")

print(f"Window size: {window_memory.size()}")
print(f"Last 2 turns: {window_memory.get_recent(k=2)}")

Best for: Long conversations where only recent context matters, such as code debugging assistants.


Pattern 3: SummaryCompressedMemory

Use an LLM to compress conversation history into a summary, preserving key information while drastically reducing token consumption. This is a core technique in LangGraph memory management.

from openai import OpenAI


class SummaryCompressedMemory:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini",
                 max_raw_messages: int = 10):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.max_raw_messages = max_raw_messages
        self.summary: str = ""
        self.recent_messages: list[Message] = []

    def add(self, role: str, content: str) -> None:
        self.recent_messages.append(Message(role=role, content=content))
        if len(self.recent_messages) > self.max_raw_messages:
            self._compress()

    def _compress(self) -> None:
        conversation_text = "\n".join(
            f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
        )
        prompt = (
            f"Compress the following conversation history into a concise summary, "
            f"preserving all key information, user preferences, and important decisions:\n\n"
            f"{conversation_text}\n\n"
            f"Existing summary: {self.summary}\n\nOutput the merged new summary:"
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        self.summary = response.choices[0].message.content
        self.recent_messages = self.recent_messages[-2:]

    def get_context(self) -> list[dict]:
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"Conversation history summary: {self.summary}",
            })
        context.extend(
            {"role": m.role, "content": m.content}
            for m in self.recent_messages
        )
        return context


summary_memory = SummaryCompressedMemory(
    api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
    summary_memory.add("user", f"I want to learn about Python feature #{i+1}")
    summary_memory.add("assistant", f"Python feature #{i+1} is...")

print(f"Context entries: {len(summary_memory.get_context())}")

Best for: Multi-turn deep conversations, consulting Agents that need long-term semantic retention.


Pattern 4: VectorSemanticMemory

Vectorize and store memories, then retrieve relevant ones via semantic similarity. This is the core implementation of vector database memory.

import numpy as np
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class MemoryItem:
    content: str
    embedding: np.ndarray
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)


class VectorSemanticMemory:
    def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
        self.embedding_dim = embedding_dim
        self.top_k = top_k
        self.memories: list[MemoryItem] = []

    def add(self, content: str, embedding: np.ndarray,
            metadata: dict | None = None) -> None:
        self.memories.append(MemoryItem(
            content=content,
            embedding=embedding,
            metadata=metadata or {},
        ))

    def search(self, query_embedding: np.ndarray,
               top_k: int | None = None) -> list[dict]:
        k = top_k or self.top_k
        if not self.memories:
            return []
        scores = []
        for mem in self.memories:
            sim = float(np.dot(query_embedding, mem.embedding) /
                        (np.linalg.norm(query_embedding) *
                         np.linalg.norm(mem.embedding) + 1e-8))
            scores.append((sim, mem))
        scores.sort(key=lambda x: x[0], reverse=True)
        return [
            {
                "content": mem.content,
                "score": score,
                "timestamp": mem.timestamp.isoformat(),
                "metadata": mem.metadata,
            }
            for score, mem in scores[:k]
        ]

    def delete_old(self, before: datetime) -> int:
        original_len = len(self.memories)
        self.memories = [m for m in self.memories if m.timestamp >= before]
        return original_len - len(self.memories)


vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
    fake_embedding = np.random.randn(128)
    fake_embedding /= np.linalg.norm(fake_embedding)
    vector_memory.add(
        content=f"User preference record #{i+1}: likes Python and Rust",
        embedding=fake_embedding,
        metadata={"source": "chat", "turn": i},
    )

query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
    print(f"[{r['score']:.4f}] {r['content']}")

Best for: RAG-enhanced Agents, personalized recommendations, cross-session knowledge retrieval. For production, replace in-memory storage with Chroma or Milvus.


Pattern 5: EpisodicMemory

Record specific events the Agent has experienced, supporting dual retrieval by time and semantics.

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class EmotionTag(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


@dataclass
class Episode:
    event: str
    context: str
    timestamp: datetime = field(default_factory=datetime.now)
    emotion: EmotionTag = EmotionTag.NEUTRAL
    importance: float = 0.5
    embedding: np.ndarray | None = None


class EpisodicMemory:
    def __init__(self, max_episodes: int = 1000):
        self.episodes: list[Episode] = []
        self.max_episodes = max_episodes

    def record(self, event: str, context: str,
               emotion: EmotionTag = EmotionTag.NEUTRAL,
               importance: float = 0.5,
               embedding: np.ndarray | None = None) -> None:
        self.episodes.append(Episode(
            event=event, context=context, emotion=emotion,
            importance=importance, embedding=embedding,
        ))
        if len(self.episodes) > self.max_episodes:
            self._evict()

    def _evict(self) -> None:
        self.episodes.sort(key=lambda e: e.importance, reverse=True)
        self.episodes = self.episodes[:self.max_episodes]

    def recall_by_time(self, start: datetime,
                       end: datetime) -> list[Episode]:
        return [
            ep for ep in self.episodes
            if start <= ep.timestamp <= end
        ]

    def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
        return [ep for ep in self.episodes if ep.importance >= threshold]

    def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
        return [ep for ep in self.episodes if ep.emotion == emotion]

    def get_recent(self, k: int = 5) -> list[Episode]:
        return self.episodes[-k:]


episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
    event="User reported slow API response",
    context="User calling /v2/predict endpoint during peak hours",
    emotion=EmotionTag.NEGATIVE,
    importance=0.9,
)
episodic_mem.record(
    event="User completed first deployment",
    context="Successfully deployed using Docker Compose",
    emotion=EmotionTag.POSITIVE,
    importance=0.7,
)

important = episodic_mem.recall_by_importance(0.8)
print(f"Important events: {len(important)}")
for ep in important:
    print(f"  [{ep.emotion.value}] {ep.event}")

Best for: Customer service Agents logging complaints, ops Agents tracking incident events.


Pattern 6: KnowledgeGraphMemory

Store entities and relationships in a graph structure, enabling multi-hop reasoning. This is the most powerful pattern in Agent memory architecture.

from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class Entity:
    name: str
    entity_type: str
    properties: dict = field(default_factory=dict)


@dataclass
class Relation:
    source: str
    target: str
    relation_type: str
    properties: dict = field(default_factory=dict)


class KnowledgeGraphMemory:
    def __init__(self):
        self.entities: dict[str, Entity] = {}
        self.relations: list[Relation] = []
        self._adjacency: dict[str, list[Relation]] = defaultdict(list)

    def add_entity(self, name: str, entity_type: str,
                   properties: dict | None = None) -> Entity:
        entity = Entity(name=name, entity_type=entity_type,
                        properties=properties or {})
        self.entities[name] = entity
        return entity

    def add_relation(self, source: str, target: str,
                     relation_type: str,
                     properties: dict | None = None) -> Relation:
        relation = Relation(source=source, target=target,
                            relation_type=relation_type,
                            properties=properties or {})
        self.relations.append(relation)
        self._adjacency[source].append(relation)
        self._adjacency[target].append(relation)
        return relation

    def get_entity(self, name: str) -> Entity | None:
        return self.entities.get(name)

    def get_relations_of(self, name: str) -> list[Relation]:
        return self._adjacency.get(name, [])

    def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
        visited = {start}
        current_level = {start}
        for _ in range(hops):
            next_level = set()
            for node in current_level:
                for rel in self._adjacency.get(node, []):
                    neighbor = rel.target if rel.source == node else rel.source
                    if neighbor not in visited:
                        next_level.add(neighbor)
                        visited.add(neighbor)
            current_level = next_level
        return visited

    def to_context_string(self, entity_name: str) -> str:
        entity = self.get_entity(entity_name)
        if not entity:
            return ""
        lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
        for rel in self.get_relations_of(entity_name):
            other = rel.target if rel.source == entity_name else rel.source
            lines.append(
                f"  - {rel.relation_type} -> {other} {rel.properties}"
            )
        return "\n".join(lines)


kg = KnowledgeGraphMemory()
kg.add_entity("Alice", "User", {"preferred_language": "Python", "level": "senior"})
kg.add_entity("FastAPI", "Framework", {"type": "Web", "version": "0.115"})
kg.add_entity("Docker", "Tool", {"type": "Containerization"})
kg.add_relation("Alice", "FastAPI", "uses", {"frequency": "daily"})
kg.add_relation("Alice", "Docker", "uses", {"frequency": "weekly"})
kg.add_relation("FastAPI", "Docker", "deployment_method")

print(kg.to_context_string("Alice"))
print(f"2-hop connections: {kg.multi_hop_query('Alice', hops=2)}")

Best for: Complex Agents requiring multi-hop reasoning, such as enterprise knowledge assistants and medical diagnosis Agents.


Pattern 7: HybridHierarchicalMemory

Fuse all the above patterns into a layered memory system. This is the ultimate solution for production Agent memory.

from dataclasses import dataclass, field
from datetime import datetime, timedelta


@dataclass
class MemoryConfig:
    short_term_window: int = 10
    summary_threshold: int = 8
    vector_top_k: int = 5
    kg_max_hops: int = 2
    importance_threshold: float = 0.7


class HybridHierarchicalMemory:
    def __init__(self, config: MemoryConfig | None = None):
        self.config = config or MemoryConfig()
        self.short_term: SlidingWindowMemory = SlidingWindowMemory(
            window_size=self.config.short_term_window
        )
        self.summary_cache: str = ""
        self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
            top_k=self.config.vector_top_k
        )
        self.episodic_mem: EpisodicMemory = EpisodicMemory()
        self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()

    def add_message(self, role: str, content: str,
                    importance: float = 0.5) -> None:
        self.short_term.add(role, content)
        if importance >= self.config.importance_threshold:
            self.episodic_mem.record(
                event=content, context=f"role={role}",
                importance=importance,
            )

    def add_semantic_memory(self, content: str,
                            embedding: np.ndarray,
                            metadata: dict | None = None) -> None:
        self.semantic_memories.add(content, embedding, metadata)

    def add_knowledge(self, entity_name: str, entity_type: str,
                      properties: dict | None = None) -> None:
        self.knowledge_graph.add_entity(
            entity_name, entity_type, properties
        )

    def add_knowledge_relation(self, source: str, target: str,
                               relation_type: str) -> None:
        self.knowledge_graph.add_relation(source, target, relation_type)

    def retrieve(self, query: str | None = None,
                 query_embedding: np.ndarray | None = None) -> list[dict]:
        context_parts = []
        short_term_ctx = self.short_term.get_context()
        if short_term_ctx:
            context_parts.append({
                "layer": "short_term",
                "content": short_term_ctx,
            })
        if self.summary_cache:
            context_parts.append({
                "layer": "summary",
                "content": self.summary_cache,
            })
        if query_embedding is not None:
            semantic_results = self.semantic_memories.search(query_embedding)
            if semantic_results:
                context_parts.append({
                    "layer": "semantic",
                    "content": semantic_results,
                })
        important_episodes = self.episodic_mem.recall_by_importance(
            self.config.importance_threshold
        )
        if important_episodes:
            context_parts.append({
                "layer": "episodic",
                "content": [
                    {"event": ep.event, "importance": ep.importance}
                    for ep in important_episodes[-5:]
                ],
            })
        return context_parts

    def get_full_context(self, query_embedding: np.ndarray | None = None,
                         focus_entity: str | None = None) -> list[dict]:
        context = self.retrieve(query_embedding=query_embedding)
        if focus_entity:
            kg_context = self.knowledge_graph.to_context_string(focus_entity)
            if kg_context:
                context.append({
                    "layer": "knowledge_graph",
                    "content": kg_context,
                })
        return context


hybrid = HybridHierarchicalMemory(MemoryConfig(
    short_term_window=5, summary_threshold=6,
    vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "Help me build a RAG service with FastAPI", importance=0.8)
hybrid.add_message("assistant", "Sure, let me design the architecture", importance=0.3)
hybrid.add_knowledge("FastAPI", "Framework", {"async": True})
hybrid.add_knowledge("RAG", "Architecture", {"type": "Retrieval-Augmented Generation"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "implementation_framework")

ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
    print(f"[{part['layer']}] {str(part['content'])[:100]}")

Best for: Enterprise-grade AI assistants, complex Agent systems requiring full-pipeline memory.


Pitfall Guide: 5 Common Traps

Pitfall 1: Storing Conversation History Without Limits

Wrong:

class BadMemory:
    def __init__(self):
        self.history = []

    def add(self, msg: str):
        self.history.append(msg)

Correct:

class GoodMemory:
    def __init__(self, max_messages: int = 50):
        self.history = []
        self.max_messages = max_messages

    def add(self, msg: str):
        self.history.append(msg)
        if len(self.history) > self.max_messages:
            self._compress_old()

    def _compress_old(self):
        old = self.history[:len(self.history) // 2]
        self.history = self.history[len(self.history) // 2:]

Wrong:

results = vector_store.similarity_search(query, k=5)
for r in results:
    context += r.page_content

Correct:

SIMILARITY_THRESHOLD = 0.75

results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
    context += doc.page_content

Pitfall 3: No Deduplication on Memory Writes

Wrong:

def save_memory(content: str):
    db.insert({"content": content})

Correct:

import hashlib


def save_memory(content: str, metadata: dict | None = None):
    content_hash = hashlib.md5(content.encode()).hexdigest()
    existing = db.find_one({"content_hash": content_hash})
    if existing:
        db.update({"content_hash": content_hash},
                  {"$set": {"updated_at": datetime.now()}})
    else:
        db.insert({
            "content": content,
            "content_hash": content_hash,
            "metadata": metadata or {},
        })

Pitfall 4: Ignoring Temporal Decay of Memories

Wrong:

all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)

Correct:

from datetime import datetime, timedelta


def get_memories_with_decay(half_life_days: float = 30.0):
    now = datetime.now()
    memories = db.get_all()
    scored = []
    for m in memories:
        age_days = (now - m["created_at"]).days
        decay = 0.5 ** (age_days / half_life_days)
        scored.append((m, decay * m.get("importance", 0.5)))
    scored.sort(key=lambda x: x[1], reverse=True)
    return [m for m, s in scored[:10]]

Pitfall 5: Losing Critical Details During Summary Compression

Wrong:

summary_prompt = "Summarize the following conversation: " + conversation_text

Correct:

summary_prompt = (
    "Compress the following conversation history into a summary. "
    "You MUST preserve:\n"
    "1. Explicitly stated user preferences and requirements\n"
    "2. Important decisions and conclusions already made\n"
    "3. Specific numbers, names, and dates mentioned\n"
    "4. Unresolved or follow-up items\n\n"
    f"Conversation content:\n{conversation_text}"
)

Error Troubleshooting: 10 Common Errors

# Error Message Cause Solution
1 Token limit exceeded Conversation history + system prompt exceeds model context window Use sliding window or summary-compressed memory
2 Embedding dimension mismatch Query vector dimension doesn't match stored vector dimension Unify on a single Embedding model for all operations
3 Rate limit hit on vector DB High-frequency queries trigger vector database rate limiting Batch queries + local cache
4 Memory retrieval returns empty Vector index not built or data not committed Verify write commit and index refresh status
5 Context window too short for summary Summary itself is too long, crowding out conversation space Limit summary length, apply hierarchical compression
6 Knowledge graph cycle detected Entity relations form a cycle Detect cycles when adding relations and reject them
7 Stale memory causing wrong answers Memory not updated, outdated information in use Implement TTL mechanism and version numbering
8 Concurrent write conflict Multiple Agent instances writing to memory simultaneously Use optimistic locking or distributed locks
9 Embedding model timeout Large batch of text for Embedding exceeds timeout Process in batches, max 100 items per batch
10 Memory leak in long-running agent Long-running Agent memory keeps growing Periodically evict low-importance memories, enforce upper limits

Advanced Optimization: 4 Key Techniques

1. Tiered Memory Storage Strategy

Hot-warm-cold tiering keeps frequently accessed memories fast while archiving stale data.

from enum import Enum
from datetime import datetime, timedelta


class MemoryTier(Enum):
    HOT = "hot"
    WARM = "warm"
    COLD = "cold"


class TieredMemoryStorage:
    def __init__(self):
        self.hot: list[dict] = []
        self.warm: list[dict] = []
        self.cold: list[dict] = []

    def add(self, memory: dict) -> None:
        memory["tier"] = MemoryTier.HOT.value
        memory["access_count"] = 0
        memory["created_at"] = datetime.now()
        self.hot.append(memory)

    def access(self, index: int) -> dict | None:
        for tier in [self.hot, self.warm, self.cold]:
            for mem in tier:
                if mem.get("index") == index:
                    mem["access_count"] += 1
                    mem["last_accessed"] = datetime.now()
                    return mem
        return None

    def rebalance(self) -> None:
        now = datetime.now()
        for mem in self.hot[:]:
            if (now - mem["last_accessed"]) > timedelta(hours=1):
                self.hot.remove(mem)
                mem["tier"] = MemoryTier.WARM.value
                self.warm.append(mem)
        for mem in self.warm[:]:
            if (now - mem["last_accessed"]) > timedelta(days=7):
                self.warm.remove(mem)
                mem["tier"] = MemoryTier.COLD.value
                self.cold.append(mem)

2. Asynchronous Memory Writes

Decouple memory writes from the Agent's main reasoning loop to avoid blocking.

import asyncio
from concurrent.futures import ThreadPoolExecutor


class AsyncMemoryWriter:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.pending: asyncio.Queue = asyncio.Queue()

    async def write(self, memory: dict) -> None:
        await self.pending.put(memory)

    async def flush(self) -> int:
        count = 0
        batch = []
        while not self.pending.empty():
            mem = await self.pending.get()
            batch.append(mem)
            count += 1
        if batch:
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                self.executor, self._batch_write, batch
            )
        return count

    @staticmethod
    def _batch_write(batch: list[dict]) -> None:
        for mem in batch:
            db.insert(mem)

    async def start_periodic_flush(self, interval: float = 5.0):
        while True:
            await asyncio.sleep(interval)
            await self.flush()

3. Memory Quality Scoring and Auto-Eviction

Score memories using a composite of recency, frequency, importance, and relevance — then evict when scores drop below threshold.

from datetime import datetime, timedelta


class MemoryQualityScorer:
    def __init__(self, decay_rate: float = 0.95,
                 min_score: float = 0.1):
        self.decay_rate = decay_rate
        self.min_score = min_score

    def score(self, memory: dict) -> float:
        recency = self._recency_score(memory)
        frequency = self._frequency_score(memory)
        importance = memory.get("importance", 0.5)
        relevance = memory.get("relevance", 0.5)
        return (0.3 * recency + 0.2 * frequency +
                0.3 * importance + 0.2 * relevance)

    def _recency_score(self, memory: dict) -> float:
        age_hours = (datetime.now() - memory.get(
            "created_at", datetime.now()
        )).total_seconds() / 3600
        return self.decay_rate ** age_hours

    def _frequency_score(self, memory: dict) -> float:
        count = memory.get("access_count", 0)
        return min(1.0, count / 10.0)

    def should_evict(self, memory: dict) -> bool:
        return self.score(memory) < self.min_score

4. Memory Versioning for Consistency

Track memory versions to handle updates and resolve conflicts in multi-Agent environments.

from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class VersionedMemory:
    key: str
    value: str
    version: int = 1
    updated_at: datetime = field(default_factory=datetime.now)
    updated_by: str = ""


class VersionedMemoryStore:
    def __init__(self):
        self.store: dict[str, VersionedMemory] = {}

    def write(self, key: str, value: str, agent_id: str,
              expected_version: int | None = None) -> bool:
        existing = self.store.get(key)
        if existing and expected_version is not None:
            if existing.version != expected_version:
                return False
        new_version = (existing.version + 1) if existing else 1
        self.store[key] = VersionedMemory(
            key=key, value=value, version=new_version,
            updated_at=datetime.now(), updated_by=agent_id,
        )
        return True

    def read(self, key: str) -> VersionedMemory | None:
        return self.store.get(key)

Comparison Analysis: All 7 Memory Patterns

Dimension Buffer Sliding Window Summary Compressed Vector Semantic Episodic Knowledge Graph Hybrid Hierarchical
Implementation Complexity ★☆☆ ★☆☆ ★★☆ ★★★ ★★☆ ★★★ ★★★★
Token Efficiency ★☆☆ ★★☆ ★★★ ★★★ ★★☆ ★★★ ★★★★
Retrieval Precision ★★☆ ★★☆ ★★☆ ★★★★ ★★★ ★★★★ ★★★★★
Multi-hop Reasoning
Cross-session
Write Latency ~1ms ~1ms ~500ms ~50ms ~10ms ~10ms ~100ms
Storage Cost Low Low Low Medium Medium Medium High
Scale Suitability <20 turns <50 turns <200 turns 10K+ items 1K+ items 10K+ entities Unlimited
Typical Use Case Simple Q&A Code debugging Consulting chat RAG-enhanced Event tracking Knowledge reasoning Enterprise assistant

More ★ = better performance on that dimension; ✓ supported △ partially supported ✗ not supported


Summary & Outlook

AI Agent memory systems are shifting from "nice-to-have" to "must-have." Key trends for 2026 and beyond:

  1. Native Memory Support: Frameworks like LangGraph Memory and MemGPT treat memory as a first-class citizen
  2. Multimodal Memory: Not just text — Agents remember image, audio, and video context
  3. Federated Memory: Multiple Agents share a memory pool while respecting privacy boundaries
  4. Adaptive Compression: Dynamic compression granularity based on query intent
  5. Memory Auditing: Traceable write and recall logs for compliance requirements

The principle for choosing a memory solution: start simple, upgrade as needed. Begin with a sliding window to validate your flow. Hit a token bottleneck? Add summary compression. Need cross-session persistence? Deploy a vector database. Multi-hop reasoning required? Introduce a knowledge graph. Don't jump straight to hybrid hierarchical memory — it's the most powerful pattern, but also the most complex.


Online Tools Recommendation

Try these browser-local tools — no sign-up required →

#AI Agent#记忆系统#LangGraph#向量数据库#生产部署#Python#2026#AI与大数据