[Ch 6] Memory Management for Production Agents
The SQLite checkpointer from Ch 4 persists conversation history — but that’s just one type of memory. Production agents need four distinct layers, each serving a different purpose and operating on a different time horizon.
The Four Memory Layers
| Layer | Storage | Scope | Limits |
|---|---|---|---|
| In-Context | Message list in RAM | Current LLM call | Context window size |
| Episodic | Database (SQLite / MongoDB) | Per thread_id | Disk space |
| Semantic | Vector store (FAISS / Qdrant) | All users / sessions | Embedding index size |
| Procedural | Source code (system prompt) | Every call | Prompt token budget |
Layer 1: In-Context Memory — Managing History Growth
As conversations grow, the message list eventually overflows the context window or becomes expensive. Two standard strategies:
Strategy A: Sliding Window (Trim)
Keep only the last N messages, discarding older ones:
# agent/memory/trim.py
from langchain_core.messages import AnyMessage, SystemMessage, trim_messages
from langchain_openai import ChatOpenAI
def trim_conversation(
messages: list[AnyMessage],
max_tokens: int = 4000,
model: str = "gpt-4o-mini",
) -> list[AnyMessage]:
"""Trim messages to stay within a token budget, always keeping the system message."""
return trim_messages(
messages,
max_tokens=max_tokens,
strategy="last", # keep the most recent messages
token_counter=ChatOpenAI(model=model),
include_system=True, # never trim the system message
allow_partial=False, # don't cut messages mid-way
)
# agent/nodes.py — use trim inside the agent node
def agent_node(state: AgentState) -> dict:
trimmed = trim_conversation(state["messages"], max_tokens=4000)
messages = [SystemMessage(content=SYSTEM_PROMPT)] + trimmed
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
Strategy B: Summarization
When trimming loses important early context (e.g., user goals set at the start), summarize instead:
# agent/memory/summarize.py
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
summarizer = ChatOpenAI(model="gpt-4o-mini", temperature=0)
SUMMARY_PROMPT = """Summarize the following conversation in 2-3 sentences.
Focus on: decisions made, key facts established, and current task status.
Be concise — this summary will replace the conversation history."""
def summarize_messages(messages: list[AnyMessage]) -> str:
"""Produce a compact summary of a list of messages."""
conversation_text = "\n".join(
f"{msg.__class__.__name__}: {msg.content}"
for msg in messages
if msg.content # skip tool-call-only messages
)
response = summarizer.invoke([
SystemMessage(content=SUMMARY_PROMPT),
HumanMessage(content=conversation_text),
])
return response.content
def maybe_summarize(
messages: list[AnyMessage],
threshold: int = 20,
) -> list[AnyMessage]:
"""If history exceeds threshold messages, summarize the older half."""
if len(messages) <= threshold:
return messages
midpoint = len(messages) // 2
old_messages = messages[:midpoint]
recent_messages = messages[midpoint:]
summary = summarize_messages(old_messages)
# Replace old messages with a single summary message
summary_msg = AIMessage(
content=f"[Conversation summary — earlier context]\n{summary}"
)
return [summary_msg] + recent_messages
Layer 2: Episodic Memory — MongoDB Checkpointer
SQLite is great for development but not for production (no concurrent writes, no horizontal scaling). For production, use the MongoDB checkpointer:
pip install langchain-mongodb motor
# agent/graph_mongo.py
import os
from motor.motor_asyncio import AsyncIOMotorClient
from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from .state import AgentState
from .nodes import agent_node, should_continue
from .tools import TOOLS
MONGO_URI = os.environ.get("MONGODB_URI", "mongodb://localhost:27017")
MONGO_DB = os.environ.get("MONGODB_DB", "agent_memory")
async def build_async_graph():
"""Build the agent graph with async MongoDB checkpointer."""
client = AsyncIOMotorClient(MONGO_URI)
checkpointer = AsyncMongoDBSaver(client, db_name=MONGO_DB)
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(TOOLS))
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
graph.add_edge("tools", "agent")
return graph.compile(checkpointer=checkpointer)
# agent/main_async.py
import asyncio
from langchain_core.messages import HumanMessage
from .graph_mongo import build_async_graph
async def main():
app = await build_async_graph()
config = {"configurable": {"thread_id": "user-42-session-1"}}
inputs = {"messages": [HumanMessage(content="List all my tasks")]}
async for chunk, metadata in app.astream(inputs, config=config, stream_mode="messages"):
if chunk.content and metadata.get("langgraph_node") == "agent":
print(chunk.content, end="", flush=True)
print()
if __name__ == "__main__":
asyncio.run(main())
# .env.example (updated)
OPENAI_API_KEY=your-api-key-here
MONGODB_URI=mongodb://localhost:27017
MONGODB_DB=agent_memory
MongoDB vs SQLite at a glance:
| SQLite | MongoDB | |
|---|---|---|
| Setup | Zero-config | Requires running server |
| Concurrent writes | ❌ Single writer | ✅ Multi-writer |
| Horizontal scaling | ❌ File-based | ✅ Replica sets / Atlas |
| Production-ready | Dev only | ✅ Yes |
| LangGraph support | SqliteSaver | AsyncMongoDBSaver |
Layer 3: Semantic Memory — RAG with FAISS
Episodic memory stores what happened. Semantic memory stores what the agent knows — a searchable knowledge base. The agent retrieves relevant facts at query time rather than scanning the entire history.
pip install langchain-community faiss-cpu tiktoken
# agent/memory/knowledge_base.py
import os
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
EMBED_MODEL = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=os.environ.get("OPENAI_API_KEY"),
)
# ── Build the index ───────────────────────────────────────────────────────────
def build_knowledge_base(documents: list[dict]) -> FAISS:
"""
Build a FAISS vector store from a list of {content, metadata} dicts.
In production, load these from a database or file store.
"""
docs = [
Document(page_content=d["content"], metadata=d.get("metadata", {}))
for d in documents
]
return FAISS.from_documents(docs, EMBED_MODEL)
# ── Example knowledge base ────────────────────────────────────────────────────
COMPANY_DOCS = [
{
"content": "The annual leave policy allows 20 days per year. Requests must be submitted 2 weeks in advance.",
"metadata": {"source": "hr-policy", "section": "leave"},
},
{
"content": "Expense reports must be filed within 30 days of the expense date. Receipts are required for amounts over $25.",
"metadata": {"source": "finance-policy", "section": "expenses"},
},
{
"content": "The on-call rotation schedule is updated every Monday. Check the team wiki for the current schedule.",
"metadata": {"source": "engineering-policy", "section": "on-call"},
},
{
"content": "All production deployments require approval from the tech lead and a passing CI pipeline.",
"metadata": {"source": "engineering-policy", "section": "deployments"},
},
]
knowledge_base = build_knowledge_base(COMPANY_DOCS)
# ── Save and load ─────────────────────────────────────────────────────────────
def save_index(index: FAISS, path: str = "faiss_index") -> None:
index.save_local(path)
def load_index(path: str = "faiss_index") -> FAISS:
return FAISS.load_local(path, EMBED_MODEL, allow_dangerous_deserialization=True)
# agent/tools.py — RAG search as a tool
from pydantic import BaseModel, Field
from langchain_core.tools import tool
from .memory.knowledge_base import knowledge_base
class SearchKnowledgeInput(BaseModel):
query: str = Field(description="The question or topic to search for in the knowledge base")
k: int = Field(default=3, description="Number of relevant documents to retrieve (1-5)")
@tool("search_knowledge_base", args_schema=SearchKnowledgeInput)
def search_knowledge_base(query: str, k: int = 3) -> str:
"""Search the company knowledge base for policies, procedures, and guidelines."""
results = knowledge_base.similarity_search_with_score(query, k=k)
if not results:
return "No relevant information found in the knowledge base."
output_lines = [f"Found {len(results)} relevant document(s):\n"]
for i, (doc, score) in enumerate(results, 1):
# Only return results above a relevance threshold
if score > 0.4: # FAISS L2 distance — lower = more similar
continue
source = doc.metadata.get("source", "unknown")
output_lines.append(f"[{i}] (source: {source})\n{doc.page_content}")
return "\n\n".join(output_lines) if len(output_lines) > 1 else "No sufficiently relevant results found."
💡 FAISS vs Qdrant: FAISS is in-memory and zero-infrastructure — perfect for tutorials and small knowledge bases (< 100K documents). For production with millions of documents, filtered search per user, or real-time updates, use Qdrant or Pinecone instead.
Layer 4: Procedural Memory — The System Prompt
The system prompt is the most overlooked memory layer. It defines how the agent behaves on every single call — its persona, constraints, output format, and domain-specific rules. Things that never change belong here.
# agent/nodes.py — dynamic system prompt
from datetime import datetime, timezone
def build_system_prompt(ctx: "AgentContext | None" = None) -> str:
"""Build the system prompt, optionally personalized with user context."""
base = """You are a task management assistant for engineering teams.
CAPABILITIES:
- Create, update, list, and delete tasks
- Search the company knowledge base for policies
- Request human confirmation before destructive actions
OUTPUT FORMAT:
- Use concise, action-oriented language
- After tool calls, summarize results in 1-2 sentences
- Format task lists as bullet points with status indicators
CONSTRAINTS:
- Only manage tasks within the user's permission scope
- Never expose other users' task data
- Always cite the source when answering policy questions"""
# Add dynamic context
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
base += f"\n\nCurrent date/time: {now}"
if ctx:
base += f"\nCurrent user: {ctx.user_id} (org: {ctx.org_id})"
base += f"\nPermissions: {', '.join(ctx.permissions)}"
return base
The procedural memory principle: if something is true for every call, put it in the system prompt. If it’s true for this call only, put it in the user message or retrieved context.
Putting It All Together
# agent/nodes.py — full agent node with all memory layers integrated
from .memory.trim import trim_conversation
from .memory.knowledge_base import search_knowledge_base
def agent_node(state: AgentState, config: RunnableConfig) -> dict:
# Layer 4: Procedural memory
ctx_data = config.get("configurable", {}).get("agent_context")
ctx = AgentContext(**ctx_data) if ctx_data else None
system_msg = SystemMessage(content=build_system_prompt(ctx))
# Layer 1: In-context memory with trimming
trimmed_messages = trim_conversation(state["messages"], max_tokens=4000)
messages = [system_msg] + trimmed_messages
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
.env.example
# .env.example
OPENAI_API_KEY=your-api-key-here
MONGODB_URI=mongodb://localhost:27017
MONGODB_DB=agent_memory
💡 Ollama note: Replace
OpenAIEmbeddingswith:from langchain_ollama import OllamaEmbeddings EMBED_MODEL = OllamaEmbeddings(model="nomic-embed-text")Pull the embedding model first:
ollama pull nomic-embed-text
Summary
| Memory Layer | Implementation | When to Use |
|---|---|---|
| In-Context | trim_messages() / summarization | Always — control token growth |
| Episodic | SqliteSaver (dev) / AsyncMongoDBSaver (prod) | Multi-turn conversations |
| Semantic | FAISS (dev) / Qdrant (prod) + @tool | Knowledge base Q&A, document retrieval |
| Procedural | System prompt (static + dynamic parts) | Persona, constraints, output format |
In the next chapter, we add observability — making every LLM call, tool execution, and token cost visible via Langfuse.
← Ch 5: Tools, Guardrails & Safety | Ch 7: Tracing with Langfuse →
