[Ch 7] Observability with Langfuse — Tracing Every Agent Step

Apr 7, 2026 · 7 min read
blog AI Agent

When something goes wrong in production — and it will — you need to answer: What did the LLM receive? Which tool did it call? What did the tool return? How long did each step take? How much did this session cost?

Without observability, debugging a multi-step agent is guesswork. This chapter adds Langfuse to give you full visibility into every agent execution.


What Langfuse Provides

graph TD A["🔁 Agent Run"] --> T["Trace\n(one per user session)"] T --> S1["Span: agent_node\n(LLM generation)"] T --> S2["Span: tool — search_documents\n(function execution)"] T --> S3["Span: agent_node\n(LLM generation — 2nd call)"] S1 --> G1["Generation\n• model, tokens, cost\n• input messages\n• output message"] S3 --> G2["Generation\n• model, tokens, cost\n• input messages\n• output message"] style A fill:#4CAF50,color:#fff,stroke:none style T fill:#7C3AED,color:#fff,stroke:none style S1 fill:#2563EB,color:#fff,stroke:none style S2 fill:#FF9800,color:#fff,stroke:none style S3 fill:#2563EB,color:#fff,stroke:none style G1 fill:#059669,color:#fff,stroke:none style G2 fill:#059669,color:#fff,stroke:none
Fig 1: Langfuse trace hierarchy — one trace per agent run, spans per step, generations for LLM calls
ConceptDescription
TraceOne complete agent run (from user input to final response)
SpanA single step within the trace (tool call, node execution)
GenerationAn LLM API call with model, token counts, cost, and messages
ScoreA metric attached post-hoc (e.g., user thumbs up, eval score)

Installation & Setup

pip install langfuse
# .env.example
OPENAI_API_KEY=your-api-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_HOST=https://cloud.langfuse.com   # or your self-hosted URL

Sign up for a free account at cloud.langfuse.com to get your keys.


Part 1: trace_agent_execution — Wrapping the Full Run

The cleanest integration pattern is a context manager that wraps the entire agent invocation. One context manager = one Langfuse trace.

# agent/tracing/langfuse_client.py
import os
from langfuse import Langfuse

def get_langfuse_client() -> Langfuse:
    """Create a Langfuse client from environment variables."""
    return Langfuse(
        public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
        secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
        host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
    )

langfuse = get_langfuse_client()
# agent/tracing/trace_agent.py
import time
from contextlib import asynccontextmanager
from langfuse import Langfuse
from langfuse.client import StatefulSpanClient
from .langfuse_client import langfuse
from .sanitize import sanitize_payload


@asynccontextmanager
async def trace_agent_execution(
    session_id: str,
    user_id: str,
    user_input: str,
    metadata: dict | None = None,
):
    """
    Context manager that wraps a full agent run in a single Langfuse trace.

    Usage:
        async with trace_agent_execution(session_id, user_id, input) as trace:
            result = await run_agent(input)
            trace.update(output=result)
    """
    trace = langfuse.trace(
        name="agent-run",
        session_id=session_id,
        user_id=user_id,
        input=sanitize_payload({"user_input": user_input}),
        metadata=metadata or {},
    )
    start_time = time.perf_counter()
    try:
        yield trace
    except Exception as exc:
        trace.update(
            output={"error": str(exc)},
            level="ERROR",
            status_message=str(exc),
        )
        raise
    finally:
        elapsed_ms = int((time.perf_counter() - start_time) * 1000)
        trace.update(metadata={"duration_ms": elapsed_ms, **(metadata or {})})
        langfuse.flush()   # ensure all events are sent before the function returns
# agent/main.py — using the context manager
from .tracing.trace_agent import trace_agent_execution

async def run_agent_with_tracing(
    app,
    thread_id: str,
    user_id: str,
    user_input: str,
) -> str:
    async with trace_agent_execution(
        session_id=thread_id,
        user_id=user_id,
        user_input=user_input,
    ) as trace:
        config = {"configurable": {"thread_id": thread_id}}
        final_state = await app.ainvoke(
            {"messages": [HumanMessage(content=user_input)]},
            config=config,
        )
        response = final_state["messages"][-1].content
        trace.update(output={"response": sanitize_payload(response)})
        return response

Part 2: @trace_tool — Per-Tool Span Tracing

Each tool call should appear as a separate span inside the trace. A decorator handles this cleanly:

# agent/tracing/trace_tool.py
import time
import functools
from typing import Callable, Any
from langfuse.client import StatefulTraceClient
from .langfuse_client import langfuse
from .sanitize import sanitize_payload


def trace_tool(
    tool_name: str | None = None,
    tags: list[str] | None = None,
) -> Callable:
    """
    Decorator that traces a tool function call as a Langfuse span.

    Usage:
        @trace_tool(tags=["retrieval"])
        def search_documents(query: str) -> str: ...
    """
    def decorator(func: Callable) -> Callable:
        name = tool_name or func.__name__

        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # Get the current active trace from LangGraph's RunnableConfig
            # (passed through kwargs as 'config' if present)
            config = kwargs.get("config") or {}
            trace_id = config.get("configurable", {}).get("langfuse_trace_id")

            # Create a span under the current trace (or standalone if no trace)
            span = langfuse.span(
                trace_id=trace_id,
                name=f"tool.{name}",
                input=sanitize_payload({"args": args, "kwargs": {
                    k: v for k, v in kwargs.items() if k != "config"
                }}),
                tags=tags or [],
            )

            start = time.perf_counter()
            try:
                result = func(*args, **kwargs)
                elapsed_ms = int((time.perf_counter() - start) * 1000)
                span.end(
                    output=sanitize_payload({"result": result}),
                    metadata={"duration_ms": elapsed_ms},
                )
                return result
            except Exception as exc:
                span.end(
                    output={"error": str(exc)},
                    level="ERROR",
                    status_message=str(exc),
                )
                raise

        return wrapper
    return decorator

Apply it to your tools:

# agent/tools.py
from .tracing.trace_tool import trace_tool

@trace_tool(tags=["retrieval"])
@tool("search_knowledge_base", args_schema=SearchKnowledgeInput)
def search_knowledge_base(query: str, k: int = 3) -> str:
    """Search the company knowledge base..."""
    ...

@trace_tool(tags=["write"])
@tool("create_task", args_schema=CreateTaskInput)
def create_task(title: str, description: str, priority: str = "medium") -> str:
    """Create a new task..."""
    ...

Part 3: Payload Sanitization

Never log raw payloads blindly. Production tool calls may contain:

  • User PII (names, emails, phone numbers)
  • Internal IDs or file paths
  • Very large strings that exceed Langfuse’s payload limits

A recursive sanitize function handles both:

# agent/tracing/sanitize.py
from typing import Any

MAX_STRING_LENGTH = 2000    # truncate strings longer than this
MAX_LIST_ITEMS   = 20       # truncate lists longer than this


def sanitize_payload(value: Any, _depth: int = 0) -> Any:
    """
    Recursively sanitize a value before sending to Langfuse:
    - Truncate long strings
    - Truncate long lists
    - Mask known sensitive key names
    - Limit recursion depth
    """
    if _depth > 5:
        return "[max depth exceeded]"

    if isinstance(value, str):
        if len(value) > MAX_STRING_LENGTH:
            return value[:MAX_STRING_LENGTH] + f"... [truncated, {len(value)} chars total]"
        return value

    if isinstance(value, dict):
        return {
            k: "[REDACTED]" if _is_sensitive_key(k) else sanitize_payload(v, _depth + 1)
            for k, v in value.items()
        }

    if isinstance(value, list):
        truncated = value[:MAX_LIST_ITEMS]
        result = [sanitize_payload(item, _depth + 1) for item in truncated]
        if len(value) > MAX_LIST_ITEMS:
            result.append(f"[{len(value) - MAX_LIST_ITEMS} more items truncated]")
        return result

    # Primitives (int, float, bool, None) pass through unchanged
    return value


_SENSITIVE_KEYS = frozenset({
    "password", "secret", "token", "api_key", "apikey",
    "authorization", "credential", "private_key", "access_key",
    "session_token", "refresh_token", "ssn", "credit_card",
})

def _is_sensitive_key(key: str) -> bool:
    key_lower = key.lower().replace("-", "_").replace(" ", "_")
    return any(s in key_lower for s in _SENSITIVE_KEYS)

Part 4: Cost Tracking

Track token usage and compute cost from the LLM callback:

# agent/tracing/cost_callback.py
import os
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from .langfuse_client import langfuse

# Pricing per 1M tokens (update from OpenAI pricing page)
PRICING = {
    "gpt-4o":        {"input": 5.00,  "output": 15.00},
    "gpt-4o-mini":   {"input": 0.15,  "output": 0.60},
    "gpt-4-turbo":   {"input": 10.00, "output": 30.00},
}


class CostCallbackHandler(AsyncCallbackHandler):
    """Callback that tracks token usage and computes cost after each LLM call."""

    def __init__(self, trace_id: str | None = None):
        self.trace_id = trace_id
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.total_cost_usd = 0.0

    async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        usage = None

        # Handle both Chat Completions and Responses API formats
        if response.llm_output:
            usage = response.llm_output.get("token_usage") or \
                    response.llm_output.get("usage")

        if usage is None:
            return

        input_tokens  = usage.get("prompt_tokens", 0)
        output_tokens = usage.get("completion_tokens", 0)
        model         = (response.llm_output or {}).get("model_name", "gpt-4o-mini")

        # Compute cost
        pricing = PRICING.get(model, PRICING["gpt-4o-mini"])
        cost = (
            input_tokens  / 1_000_000 * pricing["input"] +
            output_tokens / 1_000_000 * pricing["output"]
        )

        self.total_input_tokens  += input_tokens
        self.total_output_tokens += output_tokens
        self.total_cost_usd      += cost

        # Log to Langfuse
        if self.trace_id:
            langfuse.generation(
                trace_id=self.trace_id,
                name="llm-call",
                model=model,
                usage={
                    "input":  input_tokens,
                    "output": output_tokens,
                    "unit":   "TOKENS",
                },
                metadata={"cost_usd": round(cost, 6)},
            )
# agent/nodes.py — attach cost callback per call
from .tracing.cost_callback import CostCallbackHandler

def agent_node(state: AgentState, config: RunnableConfig) -> dict:
    trace_id = config.get("configurable", {}).get("langfuse_trace_id")
    cost_cb = CostCallbackHandler(trace_id=trace_id)

    messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
    response = llm_with_tools.invoke(messages, config={"callbacks": [cost_cb]})
    return {"messages": [response]}

Viewing Traces in Langfuse

After running your agent, open cloud.langfuse.com → Traces. You’ll see:

What you seeWhat it tells you
Trace timeline (waterfall)Which step took the longest — usually the LLM call or a slow tool
Input/output per spanExact messages the LLM received and returned — invaluable for prompt debugging
Token counts per generationIdentify high-cost sessions; track context window growth over turns
Cost per trace / per userAttribute spending to users or features for billing / optimization
Error spans (red)Which tool failed, with the exact error message and input that caused it

.env.example

# .env.example
OPENAI_API_KEY=your-api-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-your-key
LANGFUSE_SECRET_KEY=sk-lf-your-key
LANGFUSE_HOST=https://cloud.langfuse.com
MONGODB_URI=mongodb://localhost:27017
MONGODB_DB=agent_memory

💡 Self-hosted Langfuse: Set LANGFUSE_HOST to your instance URL. Langfuse is fully open-source and can run locally via Docker Compose. See the self-hosting docs.


Summary

ComponentPurpose
trace_agent_execution()Context manager wrapping the full agent run as one Langfuse trace
@trace_toolDecorator that creates a Langfuse span per tool call
sanitize_payload()Truncate large strings, mask sensitive keys before logging
CostCallbackHandlerLangChain callback that computes and logs token cost per LLM call
langfuse.flush()Ensure all buffered events are sent before the request ends

In the final chapter, we close the loop: how to systematically measure whether your agent is actually working correctly, using a three-stage evaluation pipeline.


← Ch 6: Memory Management | Ch 8: Building an Evaluation System →