Python AI Agent Memory System: 7 Production Patterns for Long-Term Memory
Why Does Your AI Agent Keep "Forgetting"?
You spent three days fine-tuning your Agent, and on day one in production, users complain: "I told you my preferences yesterday — why did you forget again?" This is not an isolated case — AI Agent memory systems are the #1 challenge for production deployment in 2026. LLMs have no persistent memory by design; every conversation starts from a blank slate. Short-term memory doesn't survive a single session, while long-term memory faces three mountains: slow retrieval, expensive storage, and poor consistency.
The harsh truth: many teams build an Agent with LangGraph, stuff conversation history into a list, and call it "memory." The result? Token explosion, skyrocketing costs, critical information truncated, and the Agent getting dumber with every turn. Python Agent long-term memory isn't just about adding a database — it requires a complete architecture design.
Core Concepts at a Glance
| Concept | Description | Typical Implementation |
|---|---|---|
| Short-term Memory | Contextual information within the current session | Conversation history list, sliding window |
| Long-term Memory | Knowledge and preferences persisted across sessions | Vector database, relational database |
| Episodic Memory | Recollection of specific events and experiences | Temporal index + vector retrieval |
| Semantic Memory | Structured understanding of concepts and knowledge | Knowledge graph, ontology |
| Working Memory | Temporary information during current reasoning steps | Scratchpad, ReAct observations |
| Vector Retrieval | Memory recall based on semantic similarity | Embedding + FAISS/Chroma |
| Memory Compression | Compressing lengthy history into summaries | LLM summarization, key information extraction |
Problem Analysis: 5 Major Challenges of AI Agent Memory Systems
| # | Challenge | Manifestation | Impact |
|---|---|---|---|
| 1 | Token Window Overflow | Conversation history exceeds model context length | Critical information truncated, Agent "forgets" |
| 2 | Insufficient Retrieval Precision | Vector search returns irrelevant memories | Agent makes decisions based on wrong information |
| 3 | Memory Consistency Conflicts | New and old memories contradict, no way to determine which is correct | Self-contradictory output, user trust collapses |
| 4 | Cold Start Problem | New users have no historical memory available | Poor personalization, low retention |
| 5 | Cost vs. Latency Tradeoff | Full-memory retrieval is slow and expensive | Response timeouts or API bill explosions |
These five problems are deeply interconnected: to solve token overflow you compress memory, compression causes information loss, and loss worsens retrieval precision. A production Agent memory architecture must address these issues systematically — not by treating symptoms in isolation.
Step-by-Step Implementation: 7 Memory Patterns
Pattern 1: ConversationBufferMemory
The simplest memory pattern — store all conversation history as-is. Suitable for short conversations.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Message:
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
class ConversationBufferMemory:
def __init__(self, max_tokens: int = 4000):
self.messages: list[Message] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str) -> None:
self.messages.append(Message(role=role, content=content))
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.messages]
def estimate_tokens(self) -> int:
return sum(len(m.content) // 4 for m in self.messages)
def is_overflow(self) -> bool:
return self.estimate_tokens() > self.max_tokens
def clear(self) -> None:
self.messages.clear()
memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "I prefer Python, please answer in Python")
memory.add("assistant", "Sure, I'll use Python for all responses")
memory.add("user", "Write a quicksort algorithm for me")
print(memory.get_context())
print(f"Token estimate: {memory.estimate_tokens()}, overflow: {memory.is_overflow()}")
Best for: Customer service bots, simple Q&A, conversations with fewer than 20 turns.
Pattern 2: SlidingWindowMemory
Keep only the most recent K turns, automatically discarding earlier history. Token usage is predictable, but early context is lost.
from collections import deque
class SlidingWindowMemory:
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.buffer: deque[Message] = deque(maxlen=window_size * 2)
def add(self, role: str, content: str) -> None:
self.buffer.append(Message(role=role, content=content))
while len(self.buffer) > self.window_size * 2:
self.buffer.popleft()
def get_context(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.buffer]
def get_recent(self, k: int = 1) -> list[dict]:
recent = list(self.buffer)[-k * 2:]
return [{"role": m.role, "content": m.content} for m in recent]
def size(self) -> int:
return len(self.buffer)
window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
window_memory.add("user", f"Question #{i+1}")
window_memory.add("assistant", f"Answer #{i+1}")
print(f"Window size: {window_memory.size()}")
print(f"Last 2 turns: {window_memory.get_recent(k=2)}")
Best for: Long conversations where only recent context matters, such as code debugging assistants.
Pattern 3: SummaryCompressedMemory
Use an LLM to compress conversation history into a summary, preserving key information while drastically reducing token consumption. This is a core technique in LangGraph memory management.
from openai import OpenAI
class SummaryCompressedMemory:
def __init__(self, api_key: str, model: str = "gpt-4o-mini",
max_raw_messages: int = 10):
self.client = OpenAI(api_key=api_key)
self.model = model
self.max_raw_messages = max_raw_messages
self.summary: str = ""
self.recent_messages: list[Message] = []
def add(self, role: str, content: str) -> None:
self.recent_messages.append(Message(role=role, content=content))
if len(self.recent_messages) > self.max_raw_messages:
self._compress()
def _compress(self) -> None:
conversation_text = "\n".join(
f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
)
prompt = (
f"Compress the following conversation history into a concise summary, "
f"preserving all key information, user preferences, and important decisions:\n\n"
f"{conversation_text}\n\n"
f"Existing summary: {self.summary}\n\nOutput the merged new summary:"
)
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
)
self.summary = response.choices[0].message.content
self.recent_messages = self.recent_messages[-2:]
def get_context(self) -> list[dict]:
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Conversation history summary: {self.summary}",
})
context.extend(
{"role": m.role, "content": m.content}
for m in self.recent_messages
)
return context
summary_memory = SummaryCompressedMemory(
api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
summary_memory.add("user", f"I want to learn about Python feature #{i+1}")
summary_memory.add("assistant", f"Python feature #{i+1} is...")
print(f"Context entries: {len(summary_memory.get_context())}")
Best for: Multi-turn deep conversations, consulting Agents that need long-term semantic retention.
Pattern 4: VectorSemanticMemory
Vectorize and store memories, then retrieve relevant ones via semantic similarity. This is the core implementation of vector database memory.
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class MemoryItem:
content: str
embedding: np.ndarray
timestamp: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
class VectorSemanticMemory:
def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
self.embedding_dim = embedding_dim
self.top_k = top_k
self.memories: list[MemoryItem] = []
def add(self, content: str, embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.memories.append(MemoryItem(
content=content,
embedding=embedding,
metadata=metadata or {},
))
def search(self, query_embedding: np.ndarray,
top_k: int | None = None) -> list[dict]:
k = top_k or self.top_k
if not self.memories:
return []
scores = []
for mem in self.memories:
sim = float(np.dot(query_embedding, mem.embedding) /
(np.linalg.norm(query_embedding) *
np.linalg.norm(mem.embedding) + 1e-8))
scores.append((sim, mem))
scores.sort(key=lambda x: x[0], reverse=True)
return [
{
"content": mem.content,
"score": score,
"timestamp": mem.timestamp.isoformat(),
"metadata": mem.metadata,
}
for score, mem in scores[:k]
]
def delete_old(self, before: datetime) -> int:
original_len = len(self.memories)
self.memories = [m for m in self.memories if m.timestamp >= before]
return original_len - len(self.memories)
vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
fake_embedding = np.random.randn(128)
fake_embedding /= np.linalg.norm(fake_embedding)
vector_memory.add(
content=f"User preference record #{i+1}: likes Python and Rust",
embedding=fake_embedding,
metadata={"source": "chat", "turn": i},
)
query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
print(f"[{r['score']:.4f}] {r['content']}")
Best for: RAG-enhanced Agents, personalized recommendations, cross-session knowledge retrieval. For production, replace in-memory storage with Chroma or Milvus.
Pattern 5: EpisodicMemory
Record specific events the Agent has experienced, supporting dual retrieval by time and semantics.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class EmotionTag(Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
@dataclass
class Episode:
event: str
context: str
timestamp: datetime = field(default_factory=datetime.now)
emotion: EmotionTag = EmotionTag.NEUTRAL
importance: float = 0.5
embedding: np.ndarray | None = None
class EpisodicMemory:
def __init__(self, max_episodes: int = 1000):
self.episodes: list[Episode] = []
self.max_episodes = max_episodes
def record(self, event: str, context: str,
emotion: EmotionTag = EmotionTag.NEUTRAL,
importance: float = 0.5,
embedding: np.ndarray | None = None) -> None:
self.episodes.append(Episode(
event=event, context=context, emotion=emotion,
importance=importance, embedding=embedding,
))
if len(self.episodes) > self.max_episodes:
self._evict()
def _evict(self) -> None:
self.episodes.sort(key=lambda e: e.importance, reverse=True)
self.episodes = self.episodes[:self.max_episodes]
def recall_by_time(self, start: datetime,
end: datetime) -> list[Episode]:
return [
ep for ep in self.episodes
if start <= ep.timestamp <= end
]
def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
return [ep for ep in self.episodes if ep.importance >= threshold]
def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
return [ep for ep in self.episodes if ep.emotion == emotion]
def get_recent(self, k: int = 5) -> list[Episode]:
return self.episodes[-k:]
episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
event="User reported slow API response",
context="User calling /v2/predict endpoint during peak hours",
emotion=EmotionTag.NEGATIVE,
importance=0.9,
)
episodic_mem.record(
event="User completed first deployment",
context="Successfully deployed using Docker Compose",
emotion=EmotionTag.POSITIVE,
importance=0.7,
)
important = episodic_mem.recall_by_importance(0.8)
print(f"Important events: {len(important)}")
for ep in important:
print(f" [{ep.emotion.value}] {ep.event}")
Best for: Customer service Agents logging complaints, ops Agents tracking incident events.
Pattern 6: KnowledgeGraphMemory
Store entities and relationships in a graph structure, enabling multi-hop reasoning. This is the most powerful pattern in Agent memory architecture.
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class Entity:
name: str
entity_type: str
properties: dict = field(default_factory=dict)
@dataclass
class Relation:
source: str
target: str
relation_type: str
properties: dict = field(default_factory=dict)
class KnowledgeGraphMemory:
def __init__(self):
self.entities: dict[str, Entity] = {}
self.relations: list[Relation] = []
self._adjacency: dict[str, list[Relation]] = defaultdict(list)
def add_entity(self, name: str, entity_type: str,
properties: dict | None = None) -> Entity:
entity = Entity(name=name, entity_type=entity_type,
properties=properties or {})
self.entities[name] = entity
return entity
def add_relation(self, source: str, target: str,
relation_type: str,
properties: dict | None = None) -> Relation:
relation = Relation(source=source, target=target,
relation_type=relation_type,
properties=properties or {})
self.relations.append(relation)
self._adjacency[source].append(relation)
self._adjacency[target].append(relation)
return relation
def get_entity(self, name: str) -> Entity | None:
return self.entities.get(name)
def get_relations_of(self, name: str) -> list[Relation]:
return self._adjacency.get(name, [])
def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
visited = {start}
current_level = {start}
for _ in range(hops):
next_level = set()
for node in current_level:
for rel in self._adjacency.get(node, []):
neighbor = rel.target if rel.source == node else rel.source
if neighbor not in visited:
next_level.add(neighbor)
visited.add(neighbor)
current_level = next_level
return visited
def to_context_string(self, entity_name: str) -> str:
entity = self.get_entity(entity_name)
if not entity:
return ""
lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
for rel in self.get_relations_of(entity_name):
other = rel.target if rel.source == entity_name else rel.source
lines.append(
f" - {rel.relation_type} -> {other} {rel.properties}"
)
return "\n".join(lines)
kg = KnowledgeGraphMemory()
kg.add_entity("Alice", "User", {"preferred_language": "Python", "level": "senior"})
kg.add_entity("FastAPI", "Framework", {"type": "Web", "version": "0.115"})
kg.add_entity("Docker", "Tool", {"type": "Containerization"})
kg.add_relation("Alice", "FastAPI", "uses", {"frequency": "daily"})
kg.add_relation("Alice", "Docker", "uses", {"frequency": "weekly"})
kg.add_relation("FastAPI", "Docker", "deployment_method")
print(kg.to_context_string("Alice"))
print(f"2-hop connections: {kg.multi_hop_query('Alice', hops=2)}")
Best for: Complex Agents requiring multi-hop reasoning, such as enterprise knowledge assistants and medical diagnosis Agents.
Pattern 7: HybridHierarchicalMemory
Fuse all the above patterns into a layered memory system. This is the ultimate solution for production Agent memory.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class MemoryConfig:
short_term_window: int = 10
summary_threshold: int = 8
vector_top_k: int = 5
kg_max_hops: int = 2
importance_threshold: float = 0.7
class HybridHierarchicalMemory:
def __init__(self, config: MemoryConfig | None = None):
self.config = config or MemoryConfig()
self.short_term: SlidingWindowMemory = SlidingWindowMemory(
window_size=self.config.short_term_window
)
self.summary_cache: str = ""
self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
top_k=self.config.vector_top_k
)
self.episodic_mem: EpisodicMemory = EpisodicMemory()
self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()
def add_message(self, role: str, content: str,
importance: float = 0.5) -> None:
self.short_term.add(role, content)
if importance >= self.config.importance_threshold:
self.episodic_mem.record(
event=content, context=f"role={role}",
importance=importance,
)
def add_semantic_memory(self, content: str,
embedding: np.ndarray,
metadata: dict | None = None) -> None:
self.semantic_memories.add(content, embedding, metadata)
def add_knowledge(self, entity_name: str, entity_type: str,
properties: dict | None = None) -> None:
self.knowledge_graph.add_entity(
entity_name, entity_type, properties
)
def add_knowledge_relation(self, source: str, target: str,
relation_type: str) -> None:
self.knowledge_graph.add_relation(source, target, relation_type)
def retrieve(self, query: str | None = None,
query_embedding: np.ndarray | None = None) -> list[dict]:
context_parts = []
short_term_ctx = self.short_term.get_context()
if short_term_ctx:
context_parts.append({
"layer": "short_term",
"content": short_term_ctx,
})
if self.summary_cache:
context_parts.append({
"layer": "summary",
"content": self.summary_cache,
})
if query_embedding is not None:
semantic_results = self.semantic_memories.search(query_embedding)
if semantic_results:
context_parts.append({
"layer": "semantic",
"content": semantic_results,
})
important_episodes = self.episodic_mem.recall_by_importance(
self.config.importance_threshold
)
if important_episodes:
context_parts.append({
"layer": "episodic",
"content": [
{"event": ep.event, "importance": ep.importance}
for ep in important_episodes[-5:]
],
})
return context_parts
def get_full_context(self, query_embedding: np.ndarray | None = None,
focus_entity: str | None = None) -> list[dict]:
context = self.retrieve(query_embedding=query_embedding)
if focus_entity:
kg_context = self.knowledge_graph.to_context_string(focus_entity)
if kg_context:
context.append({
"layer": "knowledge_graph",
"content": kg_context,
})
return context
hybrid = HybridHierarchicalMemory(MemoryConfig(
short_term_window=5, summary_threshold=6,
vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "Help me build a RAG service with FastAPI", importance=0.8)
hybrid.add_message("assistant", "Sure, let me design the architecture", importance=0.3)
hybrid.add_knowledge("FastAPI", "Framework", {"async": True})
hybrid.add_knowledge("RAG", "Architecture", {"type": "Retrieval-Augmented Generation"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "implementation_framework")
ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
print(f"[{part['layer']}] {str(part['content'])[:100]}")
Best for: Enterprise-grade AI assistants, complex Agent systems requiring full-pipeline memory.
Pitfall Guide: 5 Common Traps
Pitfall 1: Storing Conversation History Without Limits
❌ Wrong:
class BadMemory:
def __init__(self):
self.history = []
def add(self, msg: str):
self.history.append(msg)
✅ Correct:
class GoodMemory:
def __init__(self, max_messages: int = 50):
self.history = []
self.max_messages = max_messages
def add(self, msg: str):
self.history.append(msg)
if len(self.history) > self.max_messages:
self._compress_old()
def _compress_old(self):
old = self.history[:len(self.history) // 2]
self.history = self.history[len(self.history) // 2:]
Pitfall 2: No Similarity Threshold Filtering in Vector Search
❌ Wrong:
results = vector_store.similarity_search(query, k=5)
for r in results:
context += r.page_content
✅ Correct:
SIMILARITY_THRESHOLD = 0.75
results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
context += doc.page_content
Pitfall 3: No Deduplication on Memory Writes
❌ Wrong:
def save_memory(content: str):
db.insert({"content": content})
✅ Correct:
import hashlib
def save_memory(content: str, metadata: dict | None = None):
content_hash = hashlib.md5(content.encode()).hexdigest()
existing = db.find_one({"content_hash": content_hash})
if existing:
db.update({"content_hash": content_hash},
{"$set": {"updated_at": datetime.now()}})
else:
db.insert({
"content": content,
"content_hash": content_hash,
"metadata": metadata or {},
})
Pitfall 4: Ignoring Temporal Decay of Memories
❌ Wrong:
all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)
✅ Correct:
from datetime import datetime, timedelta
def get_memories_with_decay(half_life_days: float = 30.0):
now = datetime.now()
memories = db.get_all()
scored = []
for m in memories:
age_days = (now - m["created_at"]).days
decay = 0.5 ** (age_days / half_life_days)
scored.append((m, decay * m.get("importance", 0.5)))
scored.sort(key=lambda x: x[1], reverse=True)
return [m for m, s in scored[:10]]
Pitfall 5: Losing Critical Details During Summary Compression
❌ Wrong:
summary_prompt = "Summarize the following conversation: " + conversation_text
✅ Correct:
summary_prompt = (
"Compress the following conversation history into a summary. "
"You MUST preserve:\n"
"1. Explicitly stated user preferences and requirements\n"
"2. Important decisions and conclusions already made\n"
"3. Specific numbers, names, and dates mentioned\n"
"4. Unresolved or follow-up items\n\n"
f"Conversation content:\n{conversation_text}"
)
Error Troubleshooting: 10 Common Errors
| # | Error Message | Cause | Solution |
|---|---|---|---|
| 1 | Token limit exceeded |
Conversation history + system prompt exceeds model context window | Use sliding window or summary-compressed memory |
| 2 | Embedding dimension mismatch |
Query vector dimension doesn't match stored vector dimension | Unify on a single Embedding model for all operations |
| 3 | Rate limit hit on vector DB |
High-frequency queries trigger vector database rate limiting | Batch queries + local cache |
| 4 | Memory retrieval returns empty |
Vector index not built or data not committed | Verify write commit and index refresh status |
| 5 | Context window too short for summary |
Summary itself is too long, crowding out conversation space | Limit summary length, apply hierarchical compression |
| 6 | Knowledge graph cycle detected |
Entity relations form a cycle | Detect cycles when adding relations and reject them |
| 7 | Stale memory causing wrong answers |
Memory not updated, outdated information in use | Implement TTL mechanism and version numbering |
| 8 | Concurrent write conflict |
Multiple Agent instances writing to memory simultaneously | Use optimistic locking or distributed locks |
| 9 | Embedding model timeout |
Large batch of text for Embedding exceeds timeout | Process in batches, max 100 items per batch |
| 10 | Memory leak in long-running agent |
Long-running Agent memory keeps growing | Periodically evict low-importance memories, enforce upper limits |
Advanced Optimization: 4 Key Techniques
1. Tiered Memory Storage Strategy
Hot-warm-cold tiering keeps frequently accessed memories fast while archiving stale data.
from enum import Enum
from datetime import datetime, timedelta
class MemoryTier(Enum):
HOT = "hot"
WARM = "warm"
COLD = "cold"
class TieredMemoryStorage:
def __init__(self):
self.hot: list[dict] = []
self.warm: list[dict] = []
self.cold: list[dict] = []
def add(self, memory: dict) -> None:
memory["tier"] = MemoryTier.HOT.value
memory["access_count"] = 0
memory["created_at"] = datetime.now()
self.hot.append(memory)
def access(self, index: int) -> dict | None:
for tier in [self.hot, self.warm, self.cold]:
for mem in tier:
if mem.get("index") == index:
mem["access_count"] += 1
mem["last_accessed"] = datetime.now()
return mem
return None
def rebalance(self) -> None:
now = datetime.now()
for mem in self.hot[:]:
if (now - mem["last_accessed"]) > timedelta(hours=1):
self.hot.remove(mem)
mem["tier"] = MemoryTier.WARM.value
self.warm.append(mem)
for mem in self.warm[:]:
if (now - mem["last_accessed"]) > timedelta(days=7):
self.warm.remove(mem)
mem["tier"] = MemoryTier.COLD.value
self.cold.append(mem)
2. Asynchronous Memory Writes
Decouple memory writes from the Agent's main reasoning loop to avoid blocking.
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncMemoryWriter:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.pending: asyncio.Queue = asyncio.Queue()
async def write(self, memory: dict) -> None:
await self.pending.put(memory)
async def flush(self) -> int:
count = 0
batch = []
while not self.pending.empty():
mem = await self.pending.get()
batch.append(mem)
count += 1
if batch:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self.executor, self._batch_write, batch
)
return count
@staticmethod
def _batch_write(batch: list[dict]) -> None:
for mem in batch:
db.insert(mem)
async def start_periodic_flush(self, interval: float = 5.0):
while True:
await asyncio.sleep(interval)
await self.flush()
3. Memory Quality Scoring and Auto-Eviction
Score memories using a composite of recency, frequency, importance, and relevance — then evict when scores drop below threshold.
from datetime import datetime, timedelta
class MemoryQualityScorer:
def __init__(self, decay_rate: float = 0.95,
min_score: float = 0.1):
self.decay_rate = decay_rate
self.min_score = min_score
def score(self, memory: dict) -> float:
recency = self._recency_score(memory)
frequency = self._frequency_score(memory)
importance = memory.get("importance", 0.5)
relevance = memory.get("relevance", 0.5)
return (0.3 * recency + 0.2 * frequency +
0.3 * importance + 0.2 * relevance)
def _recency_score(self, memory: dict) -> float:
age_hours = (datetime.now() - memory.get(
"created_at", datetime.now()
)).total_seconds() / 3600
return self.decay_rate ** age_hours
def _frequency_score(self, memory: dict) -> float:
count = memory.get("access_count", 0)
return min(1.0, count / 10.0)
def should_evict(self, memory: dict) -> bool:
return self.score(memory) < self.min_score
4. Memory Versioning for Consistency
Track memory versions to handle updates and resolve conflicts in multi-Agent environments.
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class VersionedMemory:
key: str
value: str
version: int = 1
updated_at: datetime = field(default_factory=datetime.now)
updated_by: str = ""
class VersionedMemoryStore:
def __init__(self):
self.store: dict[str, VersionedMemory] = {}
def write(self, key: str, value: str, agent_id: str,
expected_version: int | None = None) -> bool:
existing = self.store.get(key)
if existing and expected_version is not None:
if existing.version != expected_version:
return False
new_version = (existing.version + 1) if existing else 1
self.store[key] = VersionedMemory(
key=key, value=value, version=new_version,
updated_at=datetime.now(), updated_by=agent_id,
)
return True
def read(self, key: str) -> VersionedMemory | None:
return self.store.get(key)
Comparison Analysis: All 7 Memory Patterns
| Dimension | Buffer | Sliding Window | Summary Compressed | Vector Semantic | Episodic | Knowledge Graph | Hybrid Hierarchical |
|---|---|---|---|---|---|---|---|
| Implementation Complexity | ★☆☆ | ★☆☆ | ★★☆ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| Token Efficiency | ★☆☆ | ★★☆ | ★★★ | ★★★ | ★★☆ | ★★★ | ★★★★ |
| Retrieval Precision | ★★☆ | ★★☆ | ★★☆ | ★★★★ | ★★★ | ★★★★ | ★★★★★ |
| Multi-hop Reasoning | ✗ | ✗ | ✗ | △ | ✗ | ✓ | ✓ |
| Cross-session | ✗ | ✗ | △ | ✓ | ✓ | ✓ | ✓ |
| Write Latency | ~1ms | ~1ms | ~500ms | ~50ms | ~10ms | ~10ms | ~100ms |
| Storage Cost | Low | Low | Low | Medium | Medium | Medium | High |
| Scale Suitability | <20 turns | <50 turns | <200 turns | 10K+ items | 1K+ items | 10K+ entities | Unlimited |
| Typical Use Case | Simple Q&A | Code debugging | Consulting chat | RAG-enhanced | Event tracking | Knowledge reasoning | Enterprise assistant |
More ★ = better performance on that dimension; ✓ supported △ partially supported ✗ not supported
Summary & Outlook
AI Agent memory systems are shifting from "nice-to-have" to "must-have." Key trends for 2026 and beyond:
- Native Memory Support: Frameworks like LangGraph Memory and MemGPT treat memory as a first-class citizen
- Multimodal Memory: Not just text — Agents remember image, audio, and video context
- Federated Memory: Multiple Agents share a memory pool while respecting privacy boundaries
- Adaptive Compression: Dynamic compression granularity based on query intent
- Memory Auditing: Traceable write and recall logs for compliance requirements
The principle for choosing a memory solution: start simple, upgrade as needed. Begin with a sliding window to validate your flow. Hit a token bottleneck? Add summary compression. Need cross-session persistence? Deploy a vector database. Multi-hop reasoning required? Introduce a knowledge graph. Don't jump straight to hybrid hierarchical memory — it's the most powerful pattern, but also the most complex.
Online Tools Recommendation
- JSON Formatter — Format Agent memory JSON structures and vector search results
- Base64 Encode/Decode — Encode/decode serialized data in memory storage
- Curl to Code — Convert Embedding API debug curl commands to Python code
- Hash Calculator — Compute MD5/SHA hashes for memory deduplication
Try these browser-local tools — no sign-up required →