[Ch 4] Build Your First Production-Ready Agent

In Ch 3 we built a Hello World agent that worked for a single query. In this chapter we build the real foundation: a QA automation agent that can answer questions about project documentation and generate test cases for features. The tools are stubs for now — we wire up real RAG in Ch 5. What matters here is that the agent loop, state persistence, and streaming all work correctly.
This is the foundation every subsequent chapter builds on.
Project Layout at This Stage
qa-agent/
├── .env.example
├── requirements.txt
└── agent/
├── state.py # AgentState definition
├── tools.py # Tool stubs: search_docs, generate_test_cases
├── nodes.py # Agent and tool nodes
├── graph.py # Graph assembly + compile
└── main.py # Entry point: run + stream
Installation
pip install langchain-core langchain-openai langgraph
# .env.example
OPENAI_API_KEY=your-api-key-here
💡 Ollama: Replace
ChatOpenAIinnodes.pywith:from langchain_ollama import ChatOllama llm = ChatOllama(model="llama3.2", temperature=0.1)
Step 1 — Define the State
The state is the single source of truth for everything the agent knows at any point in the loop.
# agent/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# add_messages reducer: new messages are *appended*, not replaced
messages: Annotated[list[AnyMessage], add_messages]
Why add_messages matters: Without it, every node return would overwrite the entire message list. With it, returning {"messages": [new_msg]} appends new_msg to the existing history. This is what makes multi-turn conversations work.
Step 2 — Define the Tools
We’ll build a small task management assistant with three tools. Each uses a Pydantic schema for validated inputs.
# agent/tools.py
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from langchain_core.tools import tool
# ── In-memory task store (replace with a real DB in production) ──────────────
_tasks: dict[str, dict] = {}
# ── Input schemas ─────────────────────────────────────────────────────────────
class CreateTaskInput(BaseModel):
title: str = Field(description="Short title for the task")
description: str = Field(description="Detailed description of what needs to be done")
priority: str = Field(
default="medium",
description="Task priority: 'low', 'medium', or 'high'",
)
class UpdateTaskInput(BaseModel):
task_id: str = Field(description="The ID of the task to update")
status: str = Field(description="New status: 'todo', 'in_progress', or 'done'")
class ListTasksInput(BaseModel):
filter_status: str = Field(
default="all",
description="Filter by status: 'all', 'todo', 'in_progress', or 'done'",
)
# ── Tool definitions ──────────────────────────────────────────────────────────
@tool("create_task", args_schema=CreateTaskInput)
def create_task(title: str, description: str, priority: str = "medium") -> str:
"""Create a new task and return its ID."""
task_id = f"task-{len(_tasks) + 1:03d}"
_tasks[task_id] = {
"id": task_id,
"title": title,
"description": description,
"priority": priority,
"status": "todo",
"created_at": datetime.now(timezone.utc).isoformat(),
}
return f"Created task {task_id}: '{title}' (priority: {priority})"
@tool("update_task_status", args_schema=UpdateTaskInput)
def update_task_status(task_id: str, status: str) -> str:
"""Update the status of an existing task."""
if task_id not in _tasks:
return f"Error: Task '{task_id}' not found."
valid_statuses = {"todo", "in_progress", "done"}
if status not in valid_statuses:
return f"Error: Invalid status '{status}'. Use one of: {valid_statuses}"
_tasks[task_id]["status"] = status
return f"Updated {task_id} status to '{status}'."
@tool("list_tasks", args_schema=ListTasksInput)
def list_tasks(filter_status: str = "all") -> str:
"""List all tasks, optionally filtered by status."""
if not _tasks:
return "No tasks found."
tasks = list(_tasks.values())
if filter_status != "all":
tasks = [t for t in tasks if t["status"] == filter_status]
if not tasks:
return f"No tasks with status '{filter_status}'."
lines = [f"Found {len(tasks)} task(s):\n"]
for t in tasks:
lines.append(
f" [{t['id']}] {t['title']} | {t['priority'].upper()} | {t['status']}"
)
return "\n".join(lines)
TOOLS = [create_task, update_task_status, list_tasks]
Key practices shown here:
- Every tool has a Pydantic schema (
args_schema=) — the LLM gets validated, typed inputs - Error messages are returned as strings (not exceptions) — tools should never crash the agent loop
- The in-memory
_tasksdict is a stand-in; swap with any DB in production
Step 3 — Define the Nodes
# agent/nodes.py
import os
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
from .state import AgentState
from .tools import TOOLS
SYSTEM_PROMPT = """You are a QA automation assistant for a software development team. Help users understand project requirements and generate test cases.
- When asked about requirements, APIs, or user stories, call search_docs to retrieve relevant documentation.
- When asked to generate test cases for a feature, call generate_test_cases.
- Summarize results clearly and concisely after each tool call."""
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
api_key=os.environ.get("OPENAI_API_KEY"),
)
llm_with_tools = llm.bind_tools(TOOLS)
def agent_node(state: AgentState) -> dict:
"""Call the LLM with the full conversation history."""
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
"""Route: call tools if the LLM made a tool call, otherwise finish."""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
Step 4 — Assemble the Graph with a Checkpointer
The checkpointer is what makes the agent remember conversations across multiple calls. It persists the full AgentState after every step — keyed by a thread_id.
# agent/graph.py
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from .state import AgentState
from .nodes import agent_node, should_continue
from .tools import TOOLS
def build_graph(db_path: str = "agent_memory.db"):
"""Build and compile the agent graph with SQLite persistence."""
# SQLite checkpointer — persists state after every node execution
checkpointer = SqliteSaver.from_conn_string(db_path)
graph = StateGraph(AgentState)
# Nodes
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(TOOLS))
# Edges
graph.set_entry_point("agent")
graph.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "end": END},
)
graph.add_edge("tools", "agent")
return graph.compile(checkpointer=checkpointer)
The graph looks like this:
(LLM call)"] agent --> cond{{"should_continue"}} cond -->|"tool_calls present"| tools["ToolNode
(execute tools)"] cond -->|"no tool_calls"| END([END]) tools --> agent style START fill:#4CAF50,color:#fff,stroke:none style END fill:#F44336,color:#fff,stroke:none style agent fill:#9C27B0,color:#fff,stroke:none style tools fill:#FF9800,color:#fff,stroke:none style cond fill:#2196F3,color:#fff,stroke:none
Step 5 — Run with Streaming
# agent/main.py
import os
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from .graph import build_graph
load_dotenv()
def chat(app, thread_id: str, user_input: str) -> None:
"""Send a message and stream the agent's response token by token."""
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content=user_input)]}
print(f"\nUser: {user_input}")
print("Assistant: ", end="", flush=True)
# stream_mode="messages" yields (message_chunk, metadata) tuples
for chunk, metadata in app.stream(inputs, config=config, stream_mode="messages"):
# Only print text from the AI (not tool results or empty chunks)
if (
chunk.content # has content
and metadata.get("langgraph_node") == "agent" # from the agent node
and not getattr(chunk, "tool_calls", None) # not a tool-call message
):
print(chunk.content, end="", flush=True)
print() # newline after streaming completes
def main():
app = build_graph()
# Each thread_id is a separate conversation with its own persisted history
thread_a = "qa-session-alice"
thread_b = "qa-session-bob"
# --- Thread A: Alice asks about documentation ---
chat(app, thread_a, "What are the login requirements for the system?")
chat(app, thread_a, "Generate test cases for the login flow")
# --- Thread B: Bob's separate conversation ---
chat(app, thread_b, "What does the payment API support?")
chat(app, thread_b, "Generate edge case tests for the payment API")
# --- Thread A continues (state is restored from SQLite) ---
chat(app, thread_a, "What are the performance requirements?")
if __name__ == "__main__":
main()
Expected output (abbreviated):
User: What are the login requirements for the system?
Assistant: I'll search the documentation for login requirements.
[stub] search_docs called with query='login requirements for the system', k=3.
Real retrieval will be wired in Ch 5. The actual requirements will appear
once FAISS embeddings are enabled in the next chapter.
User: Generate test cases for the login flow
Assistant: [stub] generate_test_cases called for feature='login flow'.
Real test case generation will be wired in Ch 5.
User: What are the performance requirements?
Assistant: I remember you previously asked about login requirements. Now let me
search for performance requirements.
[stub] search_docs called with query='performance requirements', k=3.
Understanding Multi-Thread Isolation
The thread_id in the config is the key to how LangGraph separates conversations:
# Thread isolation explained
config_alice = {"configurable": {"thread_id": "qa-session-alice"}}
config_bob = {"configurable": {"thread_id": "qa-session-bob"}}
# Each thread has completely separate state in SQLite:
# - Different message histories
# - In production, each user would have their own scoped data, passed
# through config["configurable"]["user_id"] (covered in Ch 6)
In a real system, you’d scope tool data to the user via thread_id or a user_id passed through config["configurable"] (covered in Ch 6).
Inspecting Persisted State
You can inspect the saved state for any thread at any time — useful for debugging:
# debug_state.py
from agent.graph import build_graph
app = build_graph()
# Get the latest state for a thread
config = {"configurable": {"thread_id": "user-alice-session-1"}}
state = app.get_state(config)
print(f"Message count: {len(state.values['messages'])}")
for msg in state.values["messages"]:
role = msg.__class__.__name__
preview = str(msg.content)[:80] if msg.content else "[tool_calls]"
print(f" [{role}] {preview}")
Summary
| Feature | Implementation |
|---|---|
| State | TypedDict with Annotated[list, add_messages] |
| Tools | @tool with Pydantic args_schema, errors returned as strings |
| Graph | StateGraph → agent → conditional → tools → agent |
| Persistence | SqliteSaver checkpointer keyed by thread_id |
| Streaming | app.stream(..., stream_mode="messages") with node filter |
| Multi-user | Separate thread_id per user/session gives isolated state |
In the next chapter, we make the tools real: we embed and index project documentation using FAISS, implement search_docs with vector similarity search, and implement generate_test_cases using the retrieved context.
← Ch 3: LangChain & LangGraph Intro | Ch 5: RAG Integration →
