Python AI Agent記憶システム:7つのプロダクション級長期記憶パターン

AI与大数据

あなたのAI Agentはなぜいつも「記憶喪失」になるのか?

3日かけて調整したAgentを本番投入した初日、ユーザーから「昨日伝えた設定はどこいった?また忘れたの?」とクレームが来る——これは決して珍しいことではありません。AI Agent記憶システムは、2026年のプロダクション運用における最大の課題です。大規模言語モデル自体には永続的な記憶がなく、毎回の対話は白紙の状態から始まります。短期記憶は1セッションしか持たず、長期記憶は検索が遅い、ストレージが高い、一貫性が悪いという三重苦に直面します。

さらに厄介なのは、多くのチームがLangGraphでAgentを構築し、会話履歴をリストに放り込むだけで記憶の実装が完了したと思い込んでいることです。結果としてトークンが爆発し、コストが跳ね上がり、重要な情報が切り捨てられ、Agentは会話を重ねるほど「バカ」になっていきます。Python Agentの長期記憶は、データベースを追加するだけでは解決しません。体系的なアーキテクチャ設計が必要です。


コア概念早見表

概念 説明 典型的な実装
短期記憶 現在のセッション内のコンテキスト情報 会話履歴リスト、スライディングウィンドウ
長期記憶 セッションをまたいで永続化される知識と嗜好 ベクトルデータベース、リレーショナルデータベース
エピソード記憶 特定の出来事や経験の記録 タイムインデックス+ベクトル検索
意味記憶 概念と知識の構造化された理解 ナレッジグラフ、オントロジー
ワーキングメモリ 現在の推論ステップにおける一時情報 スクラッチパッド、ReAct観察
ベクトル検索 意味的類似度に基づく記憶の再呼び出し Embedding+FAISS/Chroma
記憶圧縮 冗長な履歴を要約に圧縮 LLM要約、キー情報抽出

問題分析:AI Agent記憶システムの5つの主要課題

# 課題 具体的な症状 影響
1 トークンウィンドウオーバーフロー 会話履歴がモデルのコンテキスト長を超過 重要な情報が切り捨てられ、Agentが「記憶喪失」に
2 検索精度の不足 ベクトル検索が無関係な記憶を返す Agentが誤った情報に基づいて意思決定
3 記憶の一貫性衝突 新旧の記憶が矛盾し、どちらが正しいか判断できない 出力が矛盾し、ユーザーの信頼が崩壊
4 コールドスタート問題 新規ユーザーには利用可能な履歴記憶がない パーソナライズ体験が乏しく、定着率が低下
5 コストとレイテンシのトレードオフ 全量記憶検索は遅く、コストも高い レスポンスのタイムアウトやAPI料金の爆発

これら5つの問題は連鎖的に発生します。トークンオーバーフローを解決するために記憶を圧縮すると、情報が失われ、失われた情報が検索精度の問題をさらに悪化させます。プロダクション級のAgent記憶アーキテクチャは、これらの問題を体系的に解決しなければならず、その場しのぎの対応では不十分です。


ステップバイステップ実装:7つの記憶パターン

パターン1:会話バッファ記憶(ConversationBufferMemory)

最もシンプルな記憶パターン——すべての会話履歴をそのまま保存します。短い対話シナリオに適しています。

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class Message:
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)


class ConversationBufferMemory:
    def __init__(self, max_tokens: int = 4000):
        self.messages: list[Message] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str) -> None:
        self.messages.append(Message(role=role, content=content))

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.messages]

    def estimate_tokens(self) -> int:
        return sum(len(m.content) // 4 for m in self.messages)

    def is_overflow(self) -> bool:
        return self.estimate_tokens() > self.max_tokens

    def clear(self) -> None:
        self.messages.clear()


memory = ConversationBufferMemory(max_tokens=4000)
memory.add("user", "私はPythonが好きです。Pythonで答えてください")
memory.add("assistant", "承知しました。Pythonでお答えします")
memory.add("user", "クイックソートのアルゴリズムを書いて")

print(memory.get_context())
print(f"トークン推定: {memory.estimate_tokens()}, オーバーフロー: {memory.is_overflow()}")

適用シーン:カスタマーサポートボット、シンプルなQ&A、対話ターン数 < 20。


パターン2:スライディングウィンドウ記憶(SlidingWindowMemory)

直近のKターンのみを保持し、それより古い履歴は自動的に破棄します。トークンは制御可能ですが、初期の情報は失われます。

from collections import deque


class SlidingWindowMemory:
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.buffer: deque[Message] = deque(maxlen=window_size * 2)

    def add(self, role: str, content: str) -> None:
        self.buffer.append(Message(role=role, content=content))
        while len(self.buffer) > self.window_size * 2:
            self.buffer.popleft()

    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.buffer]

    def get_recent(self, k: int = 1) -> list[dict]:
        recent = list(self.buffer)[-k * 2:]
        return [{"role": m.role, "content": m.content} for m in recent]

    def size(self) -> int:
        return len(self.buffer)


window_memory = SlidingWindowMemory(window_size=5)
for i in range(10):
    window_memory.add("user", f"{i+1}番目の質問")
    window_memory.add("assistant", f"{i+1}番目の回答")

print(f"ウィンドウサイズ: {window_memory.size()}")
print(f"直近2ターン: {window_memory.get_recent(k=2)}")

適用シーン:長時間対話で直近のコンテキストのみが必要な場面、コードデバッグアシスタントなど。


パターン3:要約圧縮記憶(SummaryCompressedMemory)

LLMで会話履歴を要約に圧縮し、重要な情報を保持しつつトークン消費を大幅に削減します。これはLangGraph記憶管理の中核となるアプローチの一つです。

from openai import OpenAI


class SummaryCompressedMemory:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini",
                 max_raw_messages: int = 10):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.max_raw_messages = max_raw_messages
        self.summary: str = ""
        self.recent_messages: list[Message] = []

    def add(self, role: str, content: str) -> None:
        self.recent_messages.append(Message(role=role, content=content))
        if len(self.recent_messages) > self.max_raw_messages:
            self._compress()

    def _compress(self) -> None:
        conversation_text = "\n".join(
            f"{m.role}: {m.content}" for m in self.recent_messages[:-2]
        )
        prompt = (
            f"以下の会話履歴を簡潔な要約に圧縮してください。"
            f"すべての重要な情報、ユーザーの嗜好、重要な決定事項を保持してください:\n\n"
            f"{conversation_text}\n\n"
            f"既存の要約:{self.summary}\n\n統合した新しい要約を出力してください:"
        )
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        self.summary = response.choices[0].message.content
        self.recent_messages = self.recent_messages[-2:]

    def get_context(self) -> list[dict]:
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"会話履歴の要約:{self.summary}",
            })
        context.extend(
            {"role": m.role, "content": m.content}
            for m in self.recent_messages
        )
        return context


summary_memory = SummaryCompressedMemory(
    api_key="your-api-key", max_raw_messages=6
)
for i in range(8):
    summary_memory.add("user", f"Pythonの{i+1}番目の機能について知りたい")
    summary_memory.add("assistant", f"Pythonの{i+1}番目の機能は...")

print(f"コンテキスト項目数: {len(summary_memory.get_context())}")

適用シーン:多ターンの深い対話、コンサルティング型Agentなど、長期的な意味を保持する必要がある場面。


パターン4:ベクトル意味記憶(VectorSemanticMemory)

記憶をベクトル化して保存し、意味的類似度によって関連する記憶を検索します。これはベクトルデータベース記憶の中核となる実装です。

import numpy as np
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class MemoryItem:
    content: str
    embedding: np.ndarray
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)


class VectorSemanticMemory:
    def __init__(self, embedding_dim: int = 1536, top_k: int = 5):
        self.embedding_dim = embedding_dim
        self.top_k = top_k
        self.memories: list[MemoryItem] = []

    def add(self, content: str, embedding: np.ndarray,
            metadata: dict | None = None) -> None:
        self.memories.append(MemoryItem(
            content=content,
            embedding=embedding,
            metadata=metadata or {},
        ))

    def search(self, query_embedding: np.ndarray,
               top_k: int | None = None) -> list[dict]:
        k = top_k or self.top_k
        if not self.memories:
            return []
        scores = []
        for mem in self.memories:
            sim = float(np.dot(query_embedding, mem.embedding) /
                        (np.linalg.norm(query_embedding) *
                         np.linalg.norm(mem.embedding) + 1e-8))
            scores.append((sim, mem))
        scores.sort(key=lambda x: x[0], reverse=True)
        return [
            {
                "content": mem.content,
                "score": score,
                "timestamp": mem.timestamp.isoformat(),
                "metadata": mem.metadata,
            }
            for score, mem in scores[:k]
        ]

    def delete_old(self, before: datetime) -> int:
        original_len = len(self.memories)
        self.memories = [m for m in self.memories if m.timestamp >= before]
        return original_len - len(self.memories)


vector_memory = VectorSemanticMemory(embedding_dim=128, top_k=3)
for i in range(5):
    fake_embedding = np.random.randn(128)
    fake_embedding /= np.linalg.norm(fake_embedding)
    vector_memory.add(
        content=f"ユーザー嗜好記録{i+1}:PythonとRustが好き",
        embedding=fake_embedding,
        metadata={"source": "chat", "turn": i},
    )

query = np.random.randn(128)
query /= np.linalg.norm(query)
results = vector_memory.search(query, top_k=3)
for r in results:
    print(f"[{r['score']:.4f}] {r['content']}")

適用シーン:RAG拡張Agent、パーソナライズド推薦、セッション横断のナレッジ検索。本番環境ではChromaやMilvusでインメモリストレージを置き換えることを推奨します。


パターン5:エピソード記憶(EpisodicMemory)

Agentが経験した特定のイベントを記録し、時間と意味の二重検索をサポートします。

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class EmotionTag(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


@dataclass
class Episode:
    event: str
    context: str
    timestamp: datetime = field(default_factory=datetime.now)
    emotion: EmotionTag = EmotionTag.NEUTRAL
    importance: float = 0.5
    embedding: np.ndarray | None = None


class EpisodicMemory:
    def __init__(self, max_episodes: int = 1000):
        self.episodes: list[Episode] = []
        self.max_episodes = max_episodes

    def record(self, event: str, context: str,
               emotion: EmotionTag = EmotionTag.NEUTRAL,
               importance: float = 0.5,
               embedding: np.ndarray | None = None) -> None:
        self.episodes.append(Episode(
            event=event, context=context, emotion=emotion,
            importance=importance, embedding=embedding,
        ))
        if len(self.episodes) > self.max_episodes:
            self._evict()

    def _evict(self) -> None:
        self.episodes.sort(key=lambda e: e.importance, reverse=True)
        self.episodes = self.episodes[:self.max_episodes]

    def recall_by_time(self, start: datetime,
                       end: datetime) -> list[Episode]:
        return [
            ep for ep in self.episodes
            if start <= ep.timestamp <= end
        ]

    def recall_by_importance(self, threshold: float = 0.7) -> list[Episode]:
        return [ep for ep in self.episodes if ep.importance >= threshold]

    def recall_by_emotion(self, emotion: EmotionTag) -> list[Episode]:
        return [ep for ep in self.episodes if ep.emotion == emotion]

    def get_recent(self, k: int = 5) -> list[Episode]:
        return self.episodes[-k:]


episodic_mem = EpisodicMemory(max_episodes=100)
episodic_mem.record(
    event="ユーザーがAPIレスポンスの遅さをフィードバック",
    context="ピーク時に/v2/predictエンドポイントを呼び出し",
    emotion=EmotionTag.NEGATIVE,
    importance=0.9,
)
episodic_mem.record(
    event="ユーザーが初回デプロイを完了",
    context="Docker Composeでデプロイに成功",
    emotion=EmotionTag.POSITIVE,
    importance=0.7,
)

important = episodic_mem.recall_by_importance(0.8)
print(f"重要イベント数: {len(important)}")
for ep in important:
    print(f"  [{ep.emotion.value}] {ep.event}")

適用シーン:カスタマーサポートAgentのユーザークレーム記録、運用Agentの障害イベント記録。


パターン6:ナレッジグラフ記憶(KnowledgeGraphMemory)

グラフ構造でエンティティとリレーションを保存し、マルチホップ推論をサポートします。これはAgent記憶アーキテクチャの中で最も強力なパターンです。

from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class Entity:
    name: str
    entity_type: str
    properties: dict = field(default_factory=dict)


@dataclass
class Relation:
    source: str
    target: str
    relation_type: str
    properties: dict = field(default_factory=dict)


class KnowledgeGraphMemory:
    def __init__(self):
        self.entities: dict[str, Entity] = {}
        self.relations: list[Relation] = []
        self._adjacency: dict[str, list[Relation]] = defaultdict(list)

    def add_entity(self, name: str, entity_type: str,
                   properties: dict | None = None) -> Entity:
        entity = Entity(name=name, entity_type=entity_type,
                        properties=properties or {})
        self.entities[name] = entity
        return entity

    def add_relation(self, source: str, target: str,
                     relation_type: str,
                     properties: dict | None = None) -> Relation:
        relation = Relation(source=source, target=target,
                            relation_type=relation_type,
                            properties=properties or {})
        self.relations.append(relation)
        self._adjacency[source].append(relation)
        self._adjacency[target].append(relation)
        return relation

    def get_entity(self, name: str) -> Entity | None:
        return self.entities.get(name)

    def get_relations_of(self, name: str) -> list[Relation]:
        return self._adjacency.get(name, [])

    def multi_hop_query(self, start: str, hops: int = 2) -> set[str]:
        visited = {start}
        current_level = {start}
        for _ in range(hops):
            next_level = set()
            for node in current_level:
                for rel in self._adjacency.get(node, []):
                    neighbor = rel.target if rel.source == node else rel.source
                    if neighbor not in visited:
                        next_level.add(neighbor)
                        visited.add(neighbor)
            current_level = next_level
        return visited

    def to_context_string(self, entity_name: str) -> str:
        entity = self.get_entity(entity_name)
        if not entity:
            return ""
        lines = [f"{entity.name}({entity.entity_type}): {entity.properties}"]
        for rel in self.get_relations_of(entity_name):
            other = rel.target if rel.source == entity_name else rel.source
            lines.append(
                f"  - {rel.relation_type} -> {other} {rel.properties}"
            )
        return "\n".join(lines)


kg = KnowledgeGraphMemory()
kg.add_entity("田中", "ユーザー", {"嗜好言語": "Python", "レベル": "上級"})
kg.add_entity("FastAPI", "フレームワーク", {"タイプ": "Web", "バージョン": "0.115"})
kg.add_entity("Docker", "ツール", {"タイプ": "コンテナ化"})
kg.add_relation("田中", "FastAPI", "使用", {"頻度": "毎日"})
kg.add_relation("田中", "Docker", "使用", {"頻度": "毎週"})
kg.add_relation("FastAPI", "Docker", "デプロイ方式")

print(kg.to_context_string("田中"))
print(f"2ホップ関連: {kg.multi_hop_query('田中', hops=2)}")

適用シーン:マルチホップ推論が必要な複雑なAgent、エンタープライズナレッジアシスタント、医療診断Agentなど。


パターン7:ハイブリッド階層記憶(HybridHierarchicalMemory)

上記のすべてのパターンを統合し、階層的に記憶を組織化します。これはプロダクション級Agent記憶の究極のソリューションです。

from dataclasses import dataclass, field
from datetime import datetime, timedelta


@dataclass
class MemoryConfig:
    short_term_window: int = 10
    summary_threshold: int = 8
    vector_top_k: int = 5
    kg_max_hops: int = 2
    importance_threshold: float = 0.7


class HybridHierarchicalMemory:
    def __init__(self, config: MemoryConfig | None = None):
        self.config = config or MemoryConfig()
        self.short_term: SlidingWindowMemory = SlidingWindowMemory(
            window_size=self.config.short_term_window
        )
        self.summary_cache: str = ""
        self.semantic_memories: VectorSemanticMemory = VectorSemanticMemory(
            top_k=self.config.vector_top_k
        )
        self.episodic_mem: EpisodicMemory = EpisodicMemory()
        self.knowledge_graph: KnowledgeGraphMemory = KnowledgeGraphMemory()

    def add_message(self, role: str, content: str,
                    importance: float = 0.5) -> None:
        self.short_term.add(role, content)
        if importance >= self.config.importance_threshold:
            self.episodic_mem.record(
                event=content, context=f"role={role}",
                importance=importance,
            )

    def add_semantic_memory(self, content: str,
                            embedding: np.ndarray,
                            metadata: dict | None = None) -> None:
        self.semantic_memories.add(content, embedding, metadata)

    def add_knowledge(self, entity_name: str, entity_type: str,
                      properties: dict | None = None) -> None:
        self.knowledge_graph.add_entity(
            entity_name, entity_type, properties
        )

    def add_knowledge_relation(self, source: str, target: str,
                               relation_type: str) -> None:
        self.knowledge_graph.add_relation(source, target, relation_type)

    def retrieve(self, query: str | None = None,
                 query_embedding: np.ndarray | None = None) -> list[dict]:
        context_parts = []
        short_term_ctx = self.short_term.get_context()
        if short_term_ctx:
            context_parts.append({
                "layer": "short_term",
                "content": short_term_ctx,
            })
        if self.summary_cache:
            context_parts.append({
                "layer": "summary",
                "content": self.summary_cache,
            })
        if query_embedding is not None:
            semantic_results = self.semantic_memories.search(query_embedding)
            if semantic_results:
                context_parts.append({
                    "layer": "semantic",
                    "content": semantic_results,
                })
        important_episodes = self.episodic_mem.recall_by_importance(
            self.config.importance_threshold
        )
        if important_episodes:
            context_parts.append({
                "layer": "episodic",
                "content": [
                    {"event": ep.event, "importance": ep.importance}
                    for ep in important_episodes[-5:]
                ],
            })
        return context_parts

    def get_full_context(self, query_embedding: np.ndarray | None = None,
                         focus_entity: str | None = None) -> list[dict]:
        context = self.retrieve(query_embedding=query_embedding)
        if focus_entity:
            kg_context = self.knowledge_graph.to_context_string(focus_entity)
            if kg_context:
                context.append({
                    "layer": "knowledge_graph",
                    "content": kg_context,
                })
        return context


hybrid = HybridHierarchicalMemory(MemoryConfig(
    short_term_window=5, summary_threshold=6,
    vector_top_k=3, importance_threshold=0.6,
))
hybrid.add_message("user", "FastAPIでRAGサービスを構築したい", importance=0.8)
hybrid.add_message("assistant", "承知しました。アーキテクチャを設計します", importance=0.3)
hybrid.add_knowledge("FastAPI", "フレームワーク", {"非同期": True})
hybrid.add_knowledge("RAG", "アーキテクチャ", {"タイプ": "検索拡張生成"})
hybrid.add_knowledge_relation("FastAPI", "RAG", "実装フレームワーク")

ctx = hybrid.get_full_context(focus_entity="FastAPI")
for part in ctx:
    print(f"[{part['layer']}] {str(part['content'])[:100]}")

適用シーン:エンタープライズ級AIアシスタント、全リンク記憶が必要な複雑なAgentシステム。


落とし穴ガイド:5つのよくある罠

罠1:会話履歴を無制限に保存する

間違ったやり方

class BadMemory:
    def __init__(self):
        self.history = []

    def add(self, msg: str):
        self.history.append(msg)

正しいやり方

class GoodMemory:
    def __init__(self, max_messages: int = 50):
        self.history = []
        self.max_messages = max_messages

    def add(self, msg: str):
        self.history.append(msg)
        if len(self.history) > self.max_messages:
            self._compress_old()

    def _compress_old(self):
        old = self.history[:len(self.history) // 2]
        self.history = self.history[len(self.history) // 2:]

罠2:ベクトル検索で類似度閾値によるフィルタリングを行わない

間違ったやり方

results = vector_store.similarity_search(query, k=5)
for r in results:
    context += r.page_content

正しいやり方

SIMILARITY_THRESHOLD = 0.75

results = vector_store.similarity_search_with_score(query, k=10)
filtered = [r for r in results if r[1] >= SIMILARITY_THRESHOLD]
for doc, score in filtered[:5]:
    context += doc.page_content

罠3:記憶の書き込みで重複排除を行わない

間違ったやり方

def save_memory(content: str):
    db.insert({"content": content})

正しいやり方

import hashlib


def save_memory(content: str, metadata: dict | None = None):
    content_hash = hashlib.md5(content.encode()).hexdigest()
    existing = db.find_one({"content_hash": content_hash})
    if existing:
        db.update({"content_hash": content_hash},
                  {"$set": {"updated_at": datetime.now()}})
    else:
        db.insert({
            "content": content,
            "content_hash": content_hash,
            "metadata": metadata or {},
        })

罠4:記憶の時間減衰を無視する

間違ったやり方

all_memories = db.get_all()
context = "\n".join(m["content"] for m in all_memories)

正しいやり方

from datetime import datetime, timedelta


def get_memories_with_decay(half_life_days: float = 30.0):
    now = datetime.now()
    memories = db.get_all()
    scored = []
    for m in memories:
        age_days = (now - m["created_at"]).days
        decay = 0.5 ** (age_days / half_life_days)
        scored.append((m, decay * m.get("importance", 0.5)))
    scored.sort(key=lambda x: x[1], reverse=True)
    return [m for m, s in scored[:10]]

罠5:要約圧縮で重要な詳細を失う

間違ったやり方

summary_prompt = "以下の会話を要約してください:" + conversation_text

正しいやり方

summary_prompt = (
    "以下の会話履歴を要約に圧縮してください。以下の内容は必ず保持してください:\n"
    "1. ユーザーが明確に表現した嗜好とニーズ\n"
    "2. 行われた重要な決定と結論\n"
    "3. 言及された具体的な数値、名前、日付\n"
    "4. 未解決またはフォローアップが必要な問題\n\n"
    f"会話内容:\n{conversation_text}"
)

エラートラブルシューティング:10のよくあるエラー

# エラーメッセージ 原因 解決策
1 Token limit exceeded 会話履歴+システムプロンプトがモデルのコンテキストウィンドウを超過 スライディングウィンドウまたは要約圧縮記憶を使用
2 Embedding dimension mismatch クエリベクトルと保存ベクトルの次元が一致しない 同一のEmbeddingモデルを統一して使用
3 Rate limit hit on vector DB 高頻度検索がベクトルデータベースのレート制限に抵触 バッチクエリ+ローカルキャッシュ
4 Memory retrieval returns empty ベクトルインデックスが未構築、またはデータが未書き込み 書き込みのコミット、インデックスのリフレッシュを確認
5 Context window too short for summary 要約自体が長すぎて対話スペースを圧迫 要約長を制限し、階層的圧縮を実装
6 Knowledge graph cycle detected エンティティ関係が循環を形成 関係追加時に循環を検出して拒否
7 Stale memory causing wrong answers 記憶が更新されず、期限切れの情報を使用 TTLメカニズムとバージョン管理を実装
8 Concurrent write conflict 複数のAgentインスタンスが同時に記憶に書き込み 楽観ロックまたは分散ロックを使用
9 Embedding model timeout 大批量テキストのEmbedding処理がタイムアウト バッチ処理、1バッチあたり100件以下に制限
10 Memory leak in long-running agent 長時間稼働のAgentでメモリが継続的に増加 低重要度記憶の定期クリーンアップ、上限設定

高度な最適化:3つの重要テクニック

1. 記憶階層型ストレージ戦略

from enum import Enum
from datetime import datetime, timedelta


class MemoryTier(Enum):
    HOT = "hot"
    WARM = "warm"
    COLD = "cold"


class TieredMemoryStorage:
    def __init__(self):
        self.hot: list[dict] = []
        self.warm: list[dict] = []
        self.cold: list[dict] = []

    def add(self, memory: dict) -> None:
        memory["tier"] = MemoryTier.HOT.value
        memory["access_count"] = 0
        memory["created_at"] = datetime.now()
        self.hot.append(memory)

    def access(self, index: int) -> dict | None:
        for tier in [self.hot, self.warm, self.cold]:
            for mem in tier:
                if mem.get("index") == index:
                    mem["access_count"] += 1
                    mem["last_accessed"] = datetime.now()
                    return mem
        return None

    def rebalance(self) -> None:
        now = datetime.now()
        for mem in self.hot[:]:
            if (now - mem["last_accessed"]) > timedelta(hours=1):
                self.hot.remove(mem)
                mem["tier"] = MemoryTier.WARM.value
                self.warm.append(mem)
        for mem in self.warm[:]:
            if (now - mem["last_accessed"]) > timedelta(days=7):
                self.warm.remove(mem)
                mem["tier"] = MemoryTier.COLD.value
                self.cold.append(mem)

2. 非同期記憶書き込み

import asyncio
from concurrent.futures import ThreadPoolExecutor


class AsyncMemoryWriter:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.pending: asyncio.Queue = asyncio.Queue()

    async def write(self, memory: dict) -> None:
        await self.pending.put(memory)

    async def flush(self) -> int:
        count = 0
        batch = []
        while not self.pending.empty():
            mem = await self.pending.get()
            batch.append(mem)
            count += 1
        if batch:
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                self.executor, self._batch_write, batch
            )
        return count

    @staticmethod
    def _batch_write(batch: list[dict]) -> None:
        for mem in batch:
            db.insert(mem)

    async def start_periodic_flush(self, interval: float = 5.0):
        while True:
            await asyncio.sleep(interval)
            await self.flush()

3. 記憶品質スコアリングと自動淘汰

from datetime import datetime, timedelta


class MemoryQualityScorer:
    def __init__(self, decay_rate: float = 0.95,
                 min_score: float = 0.1):
        self.decay_rate = decay_rate
        self.min_score = min_score

    def score(self, memory: dict) -> float:
        recency = self._recency_score(memory)
        frequency = self._frequency_score(memory)
        importance = memory.get("importance", 0.5)
        relevance = memory.get("relevance", 0.5)
        return (0.3 * recency + 0.2 * frequency +
                0.3 * importance + 0.2 * relevance)

    def _recency_score(self, memory: dict) -> float:
        age_hours = (datetime.now() - memory.get(
            "created_at", datetime.now()
        )).total_seconds() / 3600
        return self.decay_rate ** age_hours

    def _frequency_score(self, memory: dict) -> float:
        count = memory.get("access_count", 0)
        return min(1.0, count / 10.0)

    def should_evict(self, memory: dict) -> bool:
        return self.score(memory) < self.min_score

比較分析:7つの記憶パターンの全面的比較

次元 会話バッファ スライディングウィンドウ 要約圧縮 ベクトル意味 エピソード記憶 ナレッジグラフ ハイブリッド階層
実装複雑度 ★☆☆ ★☆☆ ★★☆ ★★★ ★★☆ ★★★ ★★★★
トークン効率 ★☆☆ ★★☆ ★★★ ★★★ ★★☆ ★★★ ★★★★
検索精度 ★★☆ ★★☆ ★★☆ ★★★★ ★★★ ★★★★ ★★★★★
マルチホップ推論
セッション横断
書き込みレイテンシ ~1ms ~1ms ~500ms ~50ms ~10ms ~10ms ~100ms
ストレージコスト
適用スケール <20ターン <50ターン <200ターン 10K+件 1K+件 10K+エンティティ 無制限
典型的なシーン シンプルQ&A コードデバッグ コンサルティング対話 RAG拡張 イベント追跡 ナレッジ推論 エンタープライズアシスタント

★が多いほどその次元で優秀;✓対応 △部分対応 ✗非対応


まとめと展望

AI Agent記憶システムは「あれば便利」から「不可欠」へと移行しています。2026年のトレンド:

  1. ネイティブ記憶サポート:LangGraph Memory、MemGPTなどのフレームワークが記憶を第一級市民として扱う
  2. マルチモーダル記憶:テキストだけでなく、画像、音声、動画のコンテキストも記憶する
  3. フェデレーテッド記憶:複数Agentが記憶プールを共有しつつ、プライバシー境界を保護する
  4. 適応型圧縮:クエリの意図に応じて記憶の圧縮粒度を動的に決定する
  5. 記憶監査:トレーサビリティのある記憶の書き込みと再呼び出しログで、コンプライアンス要件を満たす

記憶ソリューション選択の原則:シンプルなものから始め、必要に応じてアップグレードする。まずスライディングウィンドウでフローを通し、トークンのボトルネックにぶつかったら要約圧縮を追加し、セッション横断が必要になればベクトルデータベースを導入し、マルチホップ推論が必要になればナレッジグラフを取り入れる。最初からハイブリッド階層を構築しないこと——それは最も強力なソリューションであると同時に、最も複雑なソリューションでもあります。


オンラインツール推薦

ブラウザローカルツールを無料で試す →

#AI Agent#记忆系统#LangGraph#向量数据库#生产部署#Python#2026#AI与大数据