LangGraph Quickstart¶
In this quickstart you will create a multi-agent collaboration system with LangGraph and learn how to log it and get feedback on an LLM response.
For evaluation, we will leverage the RAG triad of groundedness, context relevance and answer relevance.
In [ ]:
Copied!
# !pip install trulens langgraph trulens-providers-openai openai matplotlib
# !pip install trulens langgraph trulens-providers-openai openai matplotlib
In [ ]:
Copied!
import os
os.environ["TAVILY_API_KEY"] = "..."
os.environ["OPENAI_API_KEY"] = "..."
os.environ["TRULENS_OTEL_TRACING"] = "1"
import os
os.environ["TAVILY_API_KEY"] = "..."
os.environ["OPENAI_API_KEY"] = "..."
os.environ["TRULENS_OTEL_TRACING"] = "1"
Import from TruLens¶
In [ ]:
Copied!
from trulens.core import TruSession
session = TruSession()
session.reset_database()
from trulens.core import TruSession
session = TruSession()
session.reset_database()
Define Tools¶
In [ ]:
Copied!
from typing import Annotated
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes
tavily_tool = TavilySearchResults(max_results=5)
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
from typing import Annotated
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes
tavily_tool = TavilySearchResults(max_results=5)
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
Define Nodes¶
In [ ]:
Copied!
import json
from typing import Literal
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END
from langgraph.graph import MessagesState
from langgraph.types import Command
from trulens.otel.semconv.trace import BASE_SCOPE
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
llm = ChatOpenAI(model="gpt-4o")
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
# Research agent and node
research_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(
"You can only do research. You are working with a chart generator colleague."
),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
json.loads(dumps(message)).get("kwargs", {}).get("content", "")
for message in ret.update["messages"]
if isinstance(message, ToolMessage)
]
if hasattr(ret, "update")
else "No tool call",
},
)
def research_node(
state: MessagesState,
) -> Command[Literal["chart_generator", END]]:
result = research_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "chart_generator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: MessagesState) -> Command[Literal["researcher", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "researcher")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: MessagesState,
) -> Command[Literal["researcher", END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], "researcher")
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
import json
from typing import Literal
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END
from langgraph.graph import MessagesState
from langgraph.types import Command
from trulens.otel.semconv.trace import BASE_SCOPE
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
llm = ChatOpenAI(model="gpt-4o")
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
# Research agent and node
research_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(
"You can only do research. You are working with a chart generator colleague."
),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
json.loads(dumps(message)).get("kwargs", {}).get("content", "")
for message in ret.update["messages"]
if isinstance(message, ToolMessage)
]
if hasattr(ret, "update")
else "No tool call",
},
)
def research_node(
state: MessagesState,
) -> Command[Literal["chart_generator", END]]:
result = research_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "chart_generator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: MessagesState) -> Command[Literal["researcher", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "researcher")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: MessagesState,
) -> Command[Literal["researcher", END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], "researcher")
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
Define the graph¶
In [ ]:
Copied!
from IPython.display import Image
from IPython.display import display
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("researcher", research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_edge(START, "researcher")
graph = workflow.compile()
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
from IPython.display import Image
from IPython.display import display
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("researcher", research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_edge(START, "researcher")
graph = workflow.compile()
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
Instrument graph for logging with TruLens¶
In [ ]:
Copied!
class MultiAgentGraph:
def __init__(self, graph: StateGraph):
self.graph = graph
@instrument(
span_type=SpanAttributes.SpanType.RECORD_ROOT,
attributes={
SpanAttributes.RECORD_ROOT.INPUT: "query",
SpanAttributes.RECORD_ROOT.OUTPUT: "return",
},
)
def invoke_agent_graph(self, query: str) -> str:
events = self.graph.stream(
{
"messages": [("user", query)],
},
# Maximum number of steps to take in the graph
{"recursion_limit": 150},
)
# resp_messages = []
for event in events:
messages = list(event.values())[0]["messages"]
return (
messages[-1].content
if messages and hasattr(messages[-1], "content")
else ""
)
multi_agent_graph = MultiAgentGraph(graph)
class MultiAgentGraph:
def __init__(self, graph: StateGraph):
self.graph = graph
@instrument(
span_type=SpanAttributes.SpanType.RECORD_ROOT,
attributes={
SpanAttributes.RECORD_ROOT.INPUT: "query",
SpanAttributes.RECORD_ROOT.OUTPUT: "return",
},
)
def invoke_agent_graph(self, query: str) -> str:
events = self.graph.stream(
{
"messages": [("user", query)],
},
# Maximum number of steps to take in the graph
{"recursion_limit": 150},
)
# resp_messages = []
for event in events:
messages = list(event.values())[0]["messages"]
return (
messages[-1].content
if messages and hasattr(messages[-1], "content")
else ""
)
multi_agent_graph = MultiAgentGraph(graph)
Initialize Feedback Function(s)¶
In [ ]:
Copied!
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4.1-mini")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on_context(collect_list=True)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance"
)
.on_input()
.on_context(collect_list=False)
.aggregate(np.mean) # choose a different aggregation method if you wish
)
# Trajectory evaluations: step relevance of trace given user query
f_step_relevance = Feedback(
provider.trajectory_step_relevance_with_cot_reasons, name="Step Relevance"
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: logical consistency of trace
f_logical_consistency = Feedback(
provider.trajectory_logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: workflow efficiency of trace
f_workflow_efficiency = Feedback(
provider.trajectory_workflow_efficiency_with_cot_reasons,
name="Workflow Efficiency",
).on({
"trace": Selector(trace_level=True),
})
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4.1-mini")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on_context(collect_list=True)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance"
)
.on_input()
.on_context(collect_list=False)
.aggregate(np.mean) # choose a different aggregation method if you wish
)
# Trajectory evaluations: step relevance of trace given user query
f_step_relevance = Feedback(
provider.trajectory_step_relevance_with_cot_reasons, name="Step Relevance"
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: logical consistency of trace
f_logical_consistency = Feedback(
provider.trajectory_logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: workflow efficiency of trace
f_workflow_efficiency = Feedback(
provider.trajectory_workflow_efficiency_with_cot_reasons,
name="Workflow Efficiency",
).on({
"trace": Selector(trace_level=True),
})
Register the Graph with TruLens¶
In [ ]:
Copied!
from trulens.apps.app import TruApp
tru_recorder = TruApp(
multi_agent_graph,
app_name="Multi-Agent Chart Generation",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_step_relevance,
f_logical_consistency,
f_workflow_efficiency,
],
)
from trulens.apps.app import TruApp
tru_recorder = TruApp(
multi_agent_graph,
app_name="Multi-Agent Chart Generation",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_step_relevance,
f_logical_consistency,
f_workflow_efficiency,
],
)
Run the Graph¶
In [ ]:
Copied!
with tru_recorder as recording:
# Run the multi-agent graph with a sample query
result = multi_agent_graph.invoke_agent_graph(
"Generate a chart showing the trend of the US GDP and National Debt over the last 20 years."
)
with tru_recorder as recording:
# Run the multi-agent graph with a sample query
result = multi_agent_graph.invoke_agent_graph(
"Generate a chart showing the trend of the US GDP and National Debt over the last 20 years."
)
Check results¶
In [ ]:
Copied!
session.get_leaderboard()
session.get_leaderboard()
In [ ]:
Copied!
from trulens.dashboard import run_dashboard
run_dashboard(session=session)
from trulens.dashboard import run_dashboard
run_dashboard(session=session)