[Ch 7] Observability with Langfuse — Tracing Every Agent Step
When something goes wrong in production — and it will — you need to answer: What did the LLM receive? Which tool did it call? What did the tool return? How long did each step take? How much did this session cost?
Without observability, debugging a multi-step agent is guesswork. This chapter adds Langfuse to give you full visibility into every agent execution.
What Langfuse Provides
| Concept | Description |
|---|---|
| Trace | One complete agent run (from user input to final response) |
| Span | A single step within the trace (tool call, node execution) |
| Generation | An LLM API call with model, token counts, cost, and messages |
| Score | A metric attached post-hoc (e.g., user thumbs up, eval score) |
Installation & Setup
pip install langfuse
# .env.example
OPENAI_API_KEY=your-api-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_HOST=https://cloud.langfuse.com # or your self-hosted URL
Sign up for a free account at cloud.langfuse.com to get your keys.
Part 1: trace_agent_execution — Wrapping the Full Run
The cleanest integration pattern is a context manager that wraps the entire agent invocation. One context manager = one Langfuse trace.
# agent/tracing/langfuse_client.py
import os
from langfuse import Langfuse
def get_langfuse_client() -> Langfuse:
"""Create a Langfuse client from environment variables."""
return Langfuse(
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)
langfuse = get_langfuse_client()
# agent/tracing/trace_agent.py
import time
from contextlib import asynccontextmanager
from langfuse import Langfuse
from langfuse.client import StatefulSpanClient
from .langfuse_client import langfuse
from .sanitize import sanitize_payload
@asynccontextmanager
async def trace_agent_execution(
session_id: str,
user_id: str,
user_input: str,
metadata: dict | None = None,
):
"""
Context manager that wraps a full agent run in a single Langfuse trace.
Usage:
async with trace_agent_execution(session_id, user_id, input) as trace:
result = await run_agent(input)
trace.update(output=result)
"""
trace = langfuse.trace(
name="agent-run",
session_id=session_id,
user_id=user_id,
input=sanitize_payload({"user_input": user_input}),
metadata=metadata or {},
)
start_time = time.perf_counter()
try:
yield trace
except Exception as exc:
trace.update(
output={"error": str(exc)},
level="ERROR",
status_message=str(exc),
)
raise
finally:
elapsed_ms = int((time.perf_counter() - start_time) * 1000)
trace.update(metadata={"duration_ms": elapsed_ms, **(metadata or {})})
langfuse.flush() # ensure all events are sent before the function returns
# agent/main.py — using the context manager
from .tracing.trace_agent import trace_agent_execution
async def run_agent_with_tracing(
app,
thread_id: str,
user_id: str,
user_input: str,
) -> str:
async with trace_agent_execution(
session_id=thread_id,
user_id=user_id,
user_input=user_input,
) as trace:
config = {"configurable": {"thread_id": thread_id}}
final_state = await app.ainvoke(
{"messages": [HumanMessage(content=user_input)]},
config=config,
)
response = final_state["messages"][-1].content
trace.update(output={"response": sanitize_payload(response)})
return response
Part 2: @trace_tool — Per-Tool Span Tracing
Each tool call should appear as a separate span inside the trace. A decorator handles this cleanly:
# agent/tracing/trace_tool.py
import time
import functools
from typing import Callable, Any
from langfuse.client import StatefulTraceClient
from .langfuse_client import langfuse
from .sanitize import sanitize_payload
def trace_tool(
tool_name: str | None = None,
tags: list[str] | None = None,
) -> Callable:
"""
Decorator that traces a tool function call as a Langfuse span.
Usage:
@trace_tool(tags=["retrieval"])
def search_documents(query: str) -> str: ...
"""
def decorator(func: Callable) -> Callable:
name = tool_name or func.__name__
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Get the current active trace from LangGraph's RunnableConfig
# (passed through kwargs as 'config' if present)
config = kwargs.get("config") or {}
trace_id = config.get("configurable", {}).get("langfuse_trace_id")
# Create a span under the current trace (or standalone if no trace)
span = langfuse.span(
trace_id=trace_id,
name=f"tool.{name}",
input=sanitize_payload({"args": args, "kwargs": {
k: v for k, v in kwargs.items() if k != "config"
}}),
tags=tags or [],
)
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed_ms = int((time.perf_counter() - start) * 1000)
span.end(
output=sanitize_payload({"result": result}),
metadata={"duration_ms": elapsed_ms},
)
return result
except Exception as exc:
span.end(
output={"error": str(exc)},
level="ERROR",
status_message=str(exc),
)
raise
return wrapper
return decorator
Apply it to your tools:
# agent/tools.py
from .tracing.trace_tool import trace_tool
@trace_tool(tags=["retrieval"])
@tool("search_knowledge_base", args_schema=SearchKnowledgeInput)
def search_knowledge_base(query: str, k: int = 3) -> str:
"""Search the company knowledge base..."""
...
@trace_tool(tags=["write"])
@tool("create_task", args_schema=CreateTaskInput)
def create_task(title: str, description: str, priority: str = "medium") -> str:
"""Create a new task..."""
...
Part 3: Payload Sanitization
Never log raw payloads blindly. Production tool calls may contain:
- User PII (names, emails, phone numbers)
- Internal IDs or file paths
- Very large strings that exceed Langfuse’s payload limits
A recursive sanitize function handles both:
# agent/tracing/sanitize.py
from typing import Any
MAX_STRING_LENGTH = 2000 # truncate strings longer than this
MAX_LIST_ITEMS = 20 # truncate lists longer than this
def sanitize_payload(value: Any, _depth: int = 0) -> Any:
"""
Recursively sanitize a value before sending to Langfuse:
- Truncate long strings
- Truncate long lists
- Mask known sensitive key names
- Limit recursion depth
"""
if _depth > 5:
return "[max depth exceeded]"
if isinstance(value, str):
if len(value) > MAX_STRING_LENGTH:
return value[:MAX_STRING_LENGTH] + f"... [truncated, {len(value)} chars total]"
return value
if isinstance(value, dict):
return {
k: "[REDACTED]" if _is_sensitive_key(k) else sanitize_payload(v, _depth + 1)
for k, v in value.items()
}
if isinstance(value, list):
truncated = value[:MAX_LIST_ITEMS]
result = [sanitize_payload(item, _depth + 1) for item in truncated]
if len(value) > MAX_LIST_ITEMS:
result.append(f"[{len(value) - MAX_LIST_ITEMS} more items truncated]")
return result
# Primitives (int, float, bool, None) pass through unchanged
return value
_SENSITIVE_KEYS = frozenset({
"password", "secret", "token", "api_key", "apikey",
"authorization", "credential", "private_key", "access_key",
"session_token", "refresh_token", "ssn", "credit_card",
})
def _is_sensitive_key(key: str) -> bool:
key_lower = key.lower().replace("-", "_").replace(" ", "_")
return any(s in key_lower for s in _SENSITIVE_KEYS)
Part 4: Cost Tracking
Track token usage and compute cost from the LLM callback:
# agent/tracing/cost_callback.py
import os
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from .langfuse_client import langfuse
# Pricing per 1M tokens (update from OpenAI pricing page)
PRICING = {
"gpt-4o": {"input": 5.00, "output": 15.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
}
class CostCallbackHandler(AsyncCallbackHandler):
"""Callback that tracks token usage and computes cost after each LLM call."""
def __init__(self, trace_id: str | None = None):
self.trace_id = trace_id
self.total_input_tokens = 0
self.total_output_tokens = 0
self.total_cost_usd = 0.0
async def on_llm_end(self, response: LLMResult, **kwargs) -> None:
usage = None
# Handle both Chat Completions and Responses API formats
if response.llm_output:
usage = response.llm_output.get("token_usage") or \
response.llm_output.get("usage")
if usage is None:
return
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
model = (response.llm_output or {}).get("model_name", "gpt-4o-mini")
# Compute cost
pricing = PRICING.get(model, PRICING["gpt-4o-mini"])
cost = (
input_tokens / 1_000_000 * pricing["input"] +
output_tokens / 1_000_000 * pricing["output"]
)
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
self.total_cost_usd += cost
# Log to Langfuse
if self.trace_id:
langfuse.generation(
trace_id=self.trace_id,
name="llm-call",
model=model,
usage={
"input": input_tokens,
"output": output_tokens,
"unit": "TOKENS",
},
metadata={"cost_usd": round(cost, 6)},
)
# agent/nodes.py — attach cost callback per call
from .tracing.cost_callback import CostCallbackHandler
def agent_node(state: AgentState, config: RunnableConfig) -> dict:
trace_id = config.get("configurable", {}).get("langfuse_trace_id")
cost_cb = CostCallbackHandler(trace_id=trace_id)
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
response = llm_with_tools.invoke(messages, config={"callbacks": [cost_cb]})
return {"messages": [response]}
Viewing Traces in Langfuse
After running your agent, open cloud.langfuse.com → Traces. You’ll see:
| What you see | What it tells you |
|---|---|
| Trace timeline (waterfall) | Which step took the longest — usually the LLM call or a slow tool |
| Input/output per span | Exact messages the LLM received and returned — invaluable for prompt debugging |
| Token counts per generation | Identify high-cost sessions; track context window growth over turns |
| Cost per trace / per user | Attribute spending to users or features for billing / optimization |
| Error spans (red) | Which tool failed, with the exact error message and input that caused it |
.env.example
# .env.example
OPENAI_API_KEY=your-api-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-your-key
LANGFUSE_SECRET_KEY=sk-lf-your-key
LANGFUSE_HOST=https://cloud.langfuse.com
MONGODB_URI=mongodb://localhost:27017
MONGODB_DB=agent_memory
💡 Self-hosted Langfuse: Set
LANGFUSE_HOSTto your instance URL. Langfuse is fully open-source and can run locally via Docker Compose. See the self-hosting docs.
Summary
| Component | Purpose |
|---|---|
trace_agent_execution() | Context manager wrapping the full agent run as one Langfuse trace |
@trace_tool | Decorator that creates a Langfuse span per tool call |
sanitize_payload() | Truncate large strings, mask sensitive keys before logging |
CostCallbackHandler | LangChain callback that computes and logs token cost per LLM call |
langfuse.flush() | Ensure all buffered events are sent before the request ends |
In the final chapter, we close the loop: how to systematically measure whether your agent is actually working correctly, using a three-stage evaluation pipeline.
← Ch 6: Memory Management | Ch 8: Building an Evaluation System →
