[Ch 4] Build Your First Production-Ready Agent
In Ch 3 we built a Hello World agent that worked for a single query. In this chapter we build the real thing: a multi-turn agent that remembers conversation history across sessions, handles tool errors gracefully, streams its output token by token, and can run multiple independent user threads simultaneously.
This is the foundation every subsequent chapter builds on.
Project Structure
agent/
├── .env.example
├── state.py # AgentState definition
├── tools.py # Tool definitions
├── nodes.py # Agent and tool nodes
├── graph.py # Graph assembly + compile
└── main.py # Entry point: run + stream
Installation
pip install langchain-core langchain-openai langgraph
# .env.example
OPENAI_API_KEY=your-api-key-here
💡 Ollama: Replace
ChatOpenAIinnodes.pywith:from langchain_ollama import ChatOllama llm = ChatOllama(model="llama3.2", temperature=0.1)
Step 1 — Define the State
The state is the single source of truth for everything the agent knows at any point in the loop.
# agent/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# add_messages reducer: new messages are *appended*, not replaced
messages: Annotated[list[AnyMessage], add_messages]
Why add_messages matters: Without it, every node return would overwrite the entire message list. With it, returning {"messages": [new_msg]} appends new_msg to the existing history. This is what makes multi-turn conversations work.
Step 2 — Define the Tools
We’ll build a small task management assistant with three tools. Each uses a Pydantic schema for validated inputs.
# agent/tools.py
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from langchain_core.tools import tool
# ── In-memory task store (replace with a real DB in production) ──────────────
_tasks: dict[str, dict] = {}
# ── Input schemas ─────────────────────────────────────────────────────────────
class CreateTaskInput(BaseModel):
title: str = Field(description="Short title for the task")
description: str = Field(description="Detailed description of what needs to be done")
priority: str = Field(
default="medium",
description="Task priority: 'low', 'medium', or 'high'",
)
class UpdateTaskInput(BaseModel):
task_id: str = Field(description="The ID of the task to update")
status: str = Field(description="New status: 'todo', 'in_progress', or 'done'")
class ListTasksInput(BaseModel):
filter_status: str = Field(
default="all",
description="Filter by status: 'all', 'todo', 'in_progress', or 'done'",
)
# ── Tool definitions ──────────────────────────────────────────────────────────
@tool("create_task", args_schema=CreateTaskInput)
def create_task(title: str, description: str, priority: str = "medium") -> str:
"""Create a new task and return its ID."""
task_id = f"task-{len(_tasks) + 1:03d}"
_tasks[task_id] = {
"id": task_id,
"title": title,
"description": description,
"priority": priority,
"status": "todo",
"created_at": datetime.now(timezone.utc).isoformat(),
}
return f"Created task {task_id}: '{title}' (priority: {priority})"
@tool("update_task_status", args_schema=UpdateTaskInput)
def update_task_status(task_id: str, status: str) -> str:
"""Update the status of an existing task."""
if task_id not in _tasks:
return f"Error: Task '{task_id}' not found."
valid_statuses = {"todo", "in_progress", "done"}
if status not in valid_statuses:
return f"Error: Invalid status '{status}'. Use one of: {valid_statuses}"
_tasks[task_id]["status"] = status
return f"Updated {task_id} status to '{status}'."
@tool("list_tasks", args_schema=ListTasksInput)
def list_tasks(filter_status: str = "all") -> str:
"""List all tasks, optionally filtered by status."""
if not _tasks:
return "No tasks found."
tasks = list(_tasks.values())
if filter_status != "all":
tasks = [t for t in tasks if t["status"] == filter_status]
if not tasks:
return f"No tasks with status '{filter_status}'."
lines = [f"Found {len(tasks)} task(s):\n"]
for t in tasks:
lines.append(
f" [{t['id']}] {t['title']} | {t['priority'].upper()} | {t['status']}"
)
return "\n".join(lines)
TOOLS = [create_task, update_task_status, list_tasks]
Key practices shown here:
- Every tool has a Pydantic schema (
args_schema=) — the LLM gets validated, typed inputs - Error messages are returned as strings (not exceptions) — tools should never crash the agent loop
- The in-memory
_tasksdict is a stand-in; swap with any DB in production
Step 3 — Define the Nodes
# agent/nodes.py
import os
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
from .state import AgentState
from .tools import TOOLS
SYSTEM_PROMPT = """You are a task management assistant. Help users create, update, and track their tasks.
When a user asks you to do something with tasks, use your tools. After tool calls, summarize
the result clearly. Always be concise and action-oriented."""
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
api_key=os.environ.get("OPENAI_API_KEY"),
)
llm_with_tools = llm.bind_tools(TOOLS)
def agent_node(state: AgentState) -> dict:
"""Call the LLM with the full conversation history."""
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
"""Route: call tools if the LLM made a tool call, otherwise finish."""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
Step 4 — Assemble the Graph with a Checkpointer
The checkpointer is what makes the agent remember conversations across multiple calls. It persists the full AgentState after every step — keyed by a thread_id.
# agent/graph.py
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from .state import AgentState
from .nodes import agent_node, should_continue
from .tools import TOOLS
def build_graph(db_path: str = "agent_memory.db"):
"""Build and compile the agent graph with SQLite persistence."""
# SQLite checkpointer — persists state after every node execution
checkpointer = SqliteSaver.from_conn_string(db_path)
graph = StateGraph(AgentState)
# Nodes
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(TOOLS))
# Edges
graph.set_entry_point("agent")
graph.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "end": END},
)
graph.add_edge("tools", "agent")
return graph.compile(checkpointer=checkpointer)
The graph looks like this:
Step 5 — Run with Streaming
# agent/main.py
import os
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from .graph import build_graph
load_dotenv()
def chat(app, thread_id: str, user_input: str) -> None:
"""Send a message and stream the agent's response token by token."""
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content=user_input)]}
print(f"\nUser: {user_input}")
print("Assistant: ", end="", flush=True)
# stream_mode="messages" yields (message_chunk, metadata) tuples
for chunk, metadata in app.stream(inputs, config=config, stream_mode="messages"):
# Only print text from the AI (not tool results or empty chunks)
if (
chunk.content # has content
and metadata.get("langgraph_node") == "agent" # from the agent node
and not getattr(chunk, "tool_calls", None) # not a tool-call message
):
print(chunk.content, end="", flush=True)
print() # newline after streaming completes
def main():
app = build_graph()
# Each thread_id is a separate conversation with its own persisted history
thread_a = "user-alice-session-1"
thread_b = "user-bob-session-1"
# --- Thread A: Alice's conversation ---
chat(app, thread_a, "Create a high-priority task: Set up CI/CD pipeline")
chat(app, thread_a, "Also create a medium task: Write unit tests for auth module")
chat(app, thread_a, "Show me all my tasks")
# --- Thread B: Bob's separate conversation (no access to Alice's tasks) ---
chat(app, thread_b, "Create a task: Review Q1 budget report")
chat(app, thread_b, "Show me all tasks")
# --- Thread A resumes (state is restored from SQLite) ---
chat(app, thread_a, "Mark the CI/CD task as in_progress")
chat(app, thread_a, "Show me only in_progress tasks")
if __name__ == "__main__":
main()
Expected output (abbreviated):
User: Create a high-priority task: Set up CI/CD pipeline
Assistant: I've created the task for you! Created task task-001: 'Set up CI/CD
pipeline' (priority: high). What else would you like to do?
User: Show me all my tasks
Assistant: Here are your current tasks:
- [task-001] Set up CI/CD pipeline | HIGH | todo
- [task-002] Write unit tests for auth module | MEDIUM | todo
User: Mark the CI/CD task as in_progress
Assistant: Done! Updated task-001 status to 'in_progress'.
Understanding Multi-Thread Isolation
The thread_id in the config is the key to how LangGraph separates conversations:
# Thread isolation explained
config_alice = {"configurable": {"thread_id": "user-alice-session-1"}}
config_bob = {"configurable": {"thread_id": "user-bob-session-1"}}
# Each thread has completely separate state in SQLite:
# - Different message histories
# - The tools share the global `_tasks` dict here (because it's in-memory),
# but in production each user would have their own DB-scoped tasks
In a real system, you’d scope tool data to the user via thread_id or a user_id passed through config["configurable"] (covered in Ch 5).
Inspecting Persisted State
You can inspect the saved state for any thread at any time — useful for debugging:
# debug_state.py
from agent.graph import build_graph
app = build_graph()
# Get the latest state for a thread
config = {"configurable": {"thread_id": "user-alice-session-1"}}
state = app.get_state(config)
print(f"Message count: {len(state.values['messages'])}")
for msg in state.values["messages"]:
role = msg.__class__.__name__
preview = str(msg.content)[:80] if msg.content else "[tool_calls]"
print(f" [{role}] {preview}")
Summary
| Feature | Implementation |
|---|---|
| State | TypedDict with Annotated[list, add_messages] |
| Tools | @tool with Pydantic args_schema, errors returned as strings |
| Graph | StateGraph → agent → conditional → tools → agent |
| Persistence | SqliteSaver checkpointer keyed by thread_id |
| Streaming | app.stream(..., stream_mode="messages") with node filter |
| Multi-user | Separate thread_id per user/session gives isolated state |
In the next chapter we tackle the hard parts: injecting user context safely into tools, adding content moderation with NeMo Guardrails, and implementing human-in-the-loop interrupts.
← Ch 3: LangChain & LangGraph Intro | Ch 5: Tools, Guardrails & Safety →
