[Ch 4] Build Your First Production-Ready Agent

Apr 7, 2026 · 7 min read
blog AI Agent

In Ch 3 we built a Hello World agent that worked for a single query. In this chapter we build the real thing: a multi-turn agent that remembers conversation history across sessions, handles tool errors gracefully, streams its output token by token, and can run multiple independent user threads simultaneously.

This is the foundation every subsequent chapter builds on.


Project Structure

agent/
├── .env.example
├── state.py          # AgentState definition
├── tools.py          # Tool definitions
├── nodes.py          # Agent and tool nodes
├── graph.py          # Graph assembly + compile
└── main.py           # Entry point: run + stream

Installation

pip install langchain-core langchain-openai langgraph
# .env.example
OPENAI_API_KEY=your-api-key-here

💡 Ollama: Replace ChatOpenAI in nodes.py with:

from langchain_ollama import ChatOllama
llm = ChatOllama(model="llama3.2", temperature=0.1)

Step 1 — Define the State

The state is the single source of truth for everything the agent knows at any point in the loop.

# agent/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages


class AgentState(TypedDict):
    # add_messages reducer: new messages are *appended*, not replaced
    messages: Annotated[list[AnyMessage], add_messages]

Why add_messages matters: Without it, every node return would overwrite the entire message list. With it, returning {"messages": [new_msg]} appends new_msg to the existing history. This is what makes multi-turn conversations work.


Step 2 — Define the Tools

We’ll build a small task management assistant with three tools. Each uses a Pydantic schema for validated inputs.

# agent/tools.py
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from langchain_core.tools import tool

# ── In-memory task store (replace with a real DB in production) ──────────────
_tasks: dict[str, dict] = {}

# ── Input schemas ─────────────────────────────────────────────────────────────

class CreateTaskInput(BaseModel):
    title: str = Field(description="Short title for the task")
    description: str = Field(description="Detailed description of what needs to be done")
    priority: str = Field(
        default="medium",
        description="Task priority: 'low', 'medium', or 'high'",
    )

class UpdateTaskInput(BaseModel):
    task_id: str = Field(description="The ID of the task to update")
    status: str = Field(description="New status: 'todo', 'in_progress', or 'done'")

class ListTasksInput(BaseModel):
    filter_status: str = Field(
        default="all",
        description="Filter by status: 'all', 'todo', 'in_progress', or 'done'",
    )

# ── Tool definitions ──────────────────────────────────────────────────────────

@tool("create_task", args_schema=CreateTaskInput)
def create_task(title: str, description: str, priority: str = "medium") -> str:
    """Create a new task and return its ID."""
    task_id = f"task-{len(_tasks) + 1:03d}"
    _tasks[task_id] = {
        "id": task_id,
        "title": title,
        "description": description,
        "priority": priority,
        "status": "todo",
        "created_at": datetime.now(timezone.utc).isoformat(),
    }
    return f"Created task {task_id}: '{title}' (priority: {priority})"


@tool("update_task_status", args_schema=UpdateTaskInput)
def update_task_status(task_id: str, status: str) -> str:
    """Update the status of an existing task."""
    if task_id not in _tasks:
        return f"Error: Task '{task_id}' not found."
    valid_statuses = {"todo", "in_progress", "done"}
    if status not in valid_statuses:
        return f"Error: Invalid status '{status}'. Use one of: {valid_statuses}"
    _tasks[task_id]["status"] = status
    return f"Updated {task_id} status to '{status}'."


@tool("list_tasks", args_schema=ListTasksInput)
def list_tasks(filter_status: str = "all") -> str:
    """List all tasks, optionally filtered by status."""
    if not _tasks:
        return "No tasks found."
    tasks = list(_tasks.values())
    if filter_status != "all":
        tasks = [t for t in tasks if t["status"] == filter_status]
    if not tasks:
        return f"No tasks with status '{filter_status}'."
    lines = [f"Found {len(tasks)} task(s):\n"]
    for t in tasks:
        lines.append(
            f"  [{t['id']}] {t['title']} | {t['priority'].upper()} | {t['status']}"
        )
    return "\n".join(lines)


TOOLS = [create_task, update_task_status, list_tasks]

Key practices shown here:

  • Every tool has a Pydantic schema (args_schema=) — the LLM gets validated, typed inputs
  • Error messages are returned as strings (not exceptions) — tools should never crash the agent loop
  • The in-memory _tasks dict is a stand-in; swap with any DB in production

Step 3 — Define the Nodes

# agent/nodes.py
import os
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
from .state import AgentState
from .tools import TOOLS

SYSTEM_PROMPT = """You are a task management assistant. Help users create, update, and track their tasks.

When a user asks you to do something with tasks, use your tools. After tool calls, summarize 
the result clearly. Always be concise and action-oriented."""

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,
    api_key=os.environ.get("OPENAI_API_KEY"),
)

llm_with_tools = llm.bind_tools(TOOLS)


def agent_node(state: AgentState) -> dict:
    """Call the LLM with the full conversation history."""
    messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}


def should_continue(state: AgentState) -> str:
    """Route: call tools if the LLM made a tool call, otherwise finish."""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return "end"

Step 4 — Assemble the Graph with a Checkpointer

The checkpointer is what makes the agent remember conversations across multiple calls. It persists the full AgentState after every step — keyed by a thread_id.

# agent/graph.py
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from .state import AgentState
from .nodes import agent_node, should_continue
from .tools import TOOLS


def build_graph(db_path: str = "agent_memory.db"):
    """Build and compile the agent graph with SQLite persistence."""

    # SQLite checkpointer — persists state after every node execution
    checkpointer = SqliteSaver.from_conn_string(db_path)

    graph = StateGraph(AgentState)

    # Nodes
    graph.add_node("agent", agent_node)
    graph.add_node("tools", ToolNode(TOOLS))

    # Edges
    graph.set_entry_point("agent")
    graph.add_conditional_edges(
        "agent",
        should_continue,
        {"tools": "tools", "end": END},
    )
    graph.add_edge("tools", "agent")

    return graph.compile(checkpointer=checkpointer)

The graph looks like this:

graph LR START([START]) --> agent["agent_node\n(LLM call)"] agent --> cond{{"should_continue"}} cond -->|"tool_calls present"| tools["ToolNode\n(execute tools)"] cond -->|"no tool_calls"| END([END]) tools --> agent style START fill:#4CAF50,color:#fff,stroke:none style END fill:#F44336,color:#fff,stroke:none style agent fill:#9C27B0,color:#fff,stroke:none style tools fill:#FF9800,color:#fff,stroke:none style cond fill:#2196F3,color:#fff,stroke:none
Fig 1: Agent graph with SQLite-persisted state

Step 5 — Run with Streaming

# agent/main.py
import os
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from .graph import build_graph

load_dotenv()


def chat(app, thread_id: str, user_input: str) -> None:
    """Send a message and stream the agent's response token by token."""
    config = {"configurable": {"thread_id": thread_id}}
    inputs = {"messages": [HumanMessage(content=user_input)]}

    print(f"\nUser: {user_input}")
    print("Assistant: ", end="", flush=True)

    # stream_mode="messages" yields (message_chunk, metadata) tuples
    for chunk, metadata in app.stream(inputs, config=config, stream_mode="messages"):
        # Only print text from the AI (not tool results or empty chunks)
        if (
            chunk.content                          # has content
            and metadata.get("langgraph_node") == "agent"  # from the agent node
            and not getattr(chunk, "tool_calls", None)     # not a tool-call message
        ):
            print(chunk.content, end="", flush=True)

    print()  # newline after streaming completes


def main():
    app = build_graph()

    # Each thread_id is a separate conversation with its own persisted history
    thread_a = "user-alice-session-1"
    thread_b = "user-bob-session-1"

    # --- Thread A: Alice's conversation ---
    chat(app, thread_a, "Create a high-priority task: Set up CI/CD pipeline")
    chat(app, thread_a, "Also create a medium task: Write unit tests for auth module")
    chat(app, thread_a, "Show me all my tasks")

    # --- Thread B: Bob's separate conversation (no access to Alice's tasks) ---
    chat(app, thread_b, "Create a task: Review Q1 budget report")
    chat(app, thread_b, "Show me all tasks")

    # --- Thread A resumes (state is restored from SQLite) ---
    chat(app, thread_a, "Mark the CI/CD task as in_progress")
    chat(app, thread_a, "Show me only in_progress tasks")


if __name__ == "__main__":
    main()

Expected output (abbreviated):

User: Create a high-priority task: Set up CI/CD pipeline
Assistant: I've created the task for you! Created task task-001: 'Set up CI/CD
pipeline' (priority: high). What else would you like to do?

User: Show me all my tasks
Assistant: Here are your current tasks:
- [task-001] Set up CI/CD pipeline | HIGH | todo
- [task-002] Write unit tests for auth module | MEDIUM | todo

User: Mark the CI/CD task as in_progress
Assistant: Done! Updated task-001 status to 'in_progress'.

Understanding Multi-Thread Isolation

The thread_id in the config is the key to how LangGraph separates conversations:

# Thread isolation explained
config_alice = {"configurable": {"thread_id": "user-alice-session-1"}}
config_bob   = {"configurable": {"thread_id": "user-bob-session-1"}}

# Each thread has completely separate state in SQLite:
# - Different message histories
# - The tools share the global `_tasks` dict here (because it's in-memory),
#   but in production each user would have their own DB-scoped tasks

In a real system, you’d scope tool data to the user via thread_id or a user_id passed through config["configurable"] (covered in Ch 5).


Inspecting Persisted State

You can inspect the saved state for any thread at any time — useful for debugging:

# debug_state.py
from agent.graph import build_graph

app = build_graph()

# Get the latest state for a thread
config = {"configurable": {"thread_id": "user-alice-session-1"}}
state = app.get_state(config)

print(f"Message count: {len(state.values['messages'])}")
for msg in state.values["messages"]:
    role = msg.__class__.__name__
    preview = str(msg.content)[:80] if msg.content else "[tool_calls]"
    print(f"  [{role}] {preview}")

Summary

FeatureImplementation
StateTypedDict with Annotated[list, add_messages]
Tools@tool with Pydantic args_schema, errors returned as strings
GraphStateGraph → agent → conditional → tools → agent
PersistenceSqliteSaver checkpointer keyed by thread_id
Streamingapp.stream(..., stream_mode="messages") with node filter
Multi-userSeparate thread_id per user/session gives isolated state

In the next chapter we tackle the hard parts: injecting user context safely into tools, adding content moderation with NeMo Guardrails, and implementing human-in-the-loop interrupts.


← Ch 3: LangChain & LangGraph Intro | Ch 5: Tools, Guardrails & Safety →