Multi-agent Data Tasks¶
Build a sales data agent that can operate across private and public data, perform research, answer questions, and generate charts.
This notebook also demonstrates the use of in-line evals to improve agent effectiveness for complex queries.
In [ ]:
Copied!
%%capture --no-stderr
# pip install -U langchain_community langchain_openai langchain_experimental langchain_community matplotlib langgraph google-search-results snowflake.core trulens-core trulens-connectors-snowflake trulens-providers-openai matplotlib trulens-apps-langgraph
%%capture --no-stderr
# pip install -U langchain_community langchain_openai langchain_experimental langchain_community matplotlib langgraph google-search-results snowflake.core trulens-core trulens-connectors-snowflake trulens-providers-openai matplotlib trulens-apps-langgraph
Choose an app name¶
In [ ]:
Copied!
APP_NAME = "Sales Data Agent" # set this app name for your use case
APP_NAME = "Sales Data Agent" # set this app name for your use case
Set the resources for Cortex Agent¶
In [ ]:
Copied!
SEMANTIC_MODEL_FILE = "@sales_intelligence.data.models/sales_metrics_model.yaml"
CORTEX_SEARCH_SERVICE = "sales_intelligence.data.sales_conversation_search"
SEMANTIC_MODEL_FILE = "@sales_intelligence.data.models/sales_metrics_model.yaml"
CORTEX_SEARCH_SERVICE = "sales_intelligence.data.sales_conversation_search"
Set Keys¶
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TAVILY_API_KEY"] = "tvly-dev-..."
# ai observablity
os.environ["SNOWFLAKE_ACCOUNT"] = "..."
os.environ["SNOWFLAKE_USER"] = "..."
os.environ["SNOWFLAKE_USER_PASSWORD"] = "..."
os.environ["SNOWFLAKE_DATABASE"] = "SALES_INTELLIGENCE"
os.environ["SNOWFLAKE_SCHEMA"] = "DATA"
os.environ["SNOWFLAKE_ROLE"] = "ACCOUNTADMIN"
os.environ["SNOWFLAKE_WAREHOUSE"] = "COMPUTE_WH"
os.environ["SNOWFLAKE_PAT"] = "..." # cortex agent call
os.environ["TRULENS_OTEL_TRACING"] = (
"1" # to enable OTEL tracing
)
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TAVILY_API_KEY"] = "tvly-dev-..."
# ai observablity
os.environ["SNOWFLAKE_ACCOUNT"] = "..."
os.environ["SNOWFLAKE_USER"] = "..."
os.environ["SNOWFLAKE_USER_PASSWORD"] = "..."
os.environ["SNOWFLAKE_DATABASE"] = "SALES_INTELLIGENCE"
os.environ["SNOWFLAKE_SCHEMA"] = "DATA"
os.environ["SNOWFLAKE_ROLE"] = "ACCOUNTADMIN"
os.environ["SNOWFLAKE_WAREHOUSE"] = "COMPUTE_WH"
os.environ["SNOWFLAKE_PAT"] = "..." # cortex agent call
os.environ["TRULENS_OTEL_TRACING"] = (
"1" # to enable OTEL tracing
)
Run the following setup SQL in your Snowflake account to create resources¶
In [ ]:
Copied!
# -- Create database and schema
# CREATE OR REPLACE DATABASE sales_intelligence;
# CREATE OR REPLACE SCHEMA sales_intelligence.data;
# CREATE OR REPLACE WAREHOUSE sales_intelligence_wh;
# USE DATABASE sales_intelligence;
# USE SCHEMA data;
# -- Create tables for sales data
# CREATE TABLE sales_conversations (
# conversation_id VARCHAR,
# transcript_text TEXT,
# customer_name VARCHAR,
# deal_stage VARCHAR,
# sales_rep VARCHAR,
# conversation_date TIMESTAMP,
# deal_value FLOAT,
# product_line VARCHAR
# );
# CREATE TABLE sales_metrics (
# deal_id VARCHAR,
# customer_name VARCHAR,
# deal_value FLOAT,
# close_date DATE,
# sales_stage VARCHAR,
# win_status BOOLEAN,
# sales_rep VARCHAR,
# product_line VARCHAR
# );
# -- First, let's insert data into sales_conversations
# INSERT INTO sales_conversations
# (conversation_id, transcript_text, customer_name, deal_stage, sales_rep, conversation_date, deal_value, product_line)
# VALUES
# ('CONV001', 'Initial discovery call with TechCorp Inc''s IT Director and Solutions Architect. Client showed strong interest in our enterprise solution features, particularly the automated workflow capabilities. The main discussion centered around integration timeline and complexity. They currently use Legacy System X for their core operations and expressed concerns about potential disruption during migration. The team asked detailed questions about API compatibility and data migration tools.
# Action items include providing a detailed integration timeline document, scheduling a technical deep-dive with their infrastructure team, and sharing case studies of similar Legacy System X migrations. The client mentioned a Q2 budget allocation for digital transformation initiatives. Overall, it was a positive engagement with clear next steps.', 'TechCorp Inc', 'Discovery', 'Sarah Johnson', '2024-01-15 10:30:00', 75000, 'Enterprise Suite'),
# ('CONV002', 'Follow-up call with SmallBiz Solutions'' Operations Manager and Finance Director. The primary focus was on pricing structure and ROI timeline. They compared our Basic Package pricing with Competitor Y''s small business offering. Key discussion points included monthly vs. annual billing options, user license limitations, and potential cost savings from process automation.
# The client requested a detailed ROI analysis focusing on time saved in daily operations, resource allocation improvements, and projected efficiency gains. Budget constraints were clearly communicated, with a maximum budget of $30K for this year. They showed interest in starting with the basic package with room for a potential upgrade in Q4. Next steps include providing a competitive analysis and a customized ROI calculator by next week.', 'SmallBiz Solutions', 'Negotiation', 'Mike Chen', '2024-01-16 14:45:00', 25000, 'Basic Package'),
# ('CONV003', 'Strategy session with SecureBank Ltd''s CISO and Security Operations team. Extremely positive 90-minute deep dive into our Premium Security package. Customer emphasized immediate need for implementation due to recent industry compliance updates. Our advanced security features, especially multi-factor authentication and encryption protocols, were identified as perfect fits for their requirements. Technical team was particularly impressed with our zero-trust architecture approach and real-time threat monitoring capabilities. They''ve already secured budget approval and have executive buy-in. Compliance documentation is ready for review. Action items include: finalizing implementation timeline, scheduling security audit, and preparing necessary documentation for their risk assessment team. Client ready to move forward with contract discussions.', 'SecureBank Ltd', 'Closing', 'Rachel Torres', '2024-01-17 11:20:00', 150000, 'Premium Security'),
# ('CONV004', 'Comprehensive discovery call with GrowthStart Up''s CTO and Department Heads. Team of 500+ employees across 3 continents discussed current challenges with their existing solution. Major pain points identified: system crashes during peak usage, limited cross-department reporting capabilities, and poor scalability for remote teams. Deep dive into their current workflow revealed bottlenecks in data sharing and collaboration. Technical requirements gathered for each department. Platform demo focused on scalability features and global team management capabilities. Client particularly interested in our API ecosystem and custom reporting engine. Next steps: schedule department-specific workflow analysis and prepare detailed platform migration plan.', 'GrowthStart Up', 'Discovery', 'Sarah Johnson', '2024-01-18 09:15:00', 100000, 'Enterprise Suite'),
# ('CONV005', 'In-depth demo session with DataDriven Co''s Analytics team and Business Intelligence managers. Showcase focused on advanced analytics capabilities, custom dashboard creation, and real-time data processing features. Team was particularly impressed with our machine learning integration and predictive analytics models. Competitor comparison requested specifically against Market Leader Z and Innovative Start-up X. Price point falls within their allocated budget range, but team expressed interest in multi-year commitment with corresponding discount structure. Technical questions centered around data warehouse integration and custom visualization capabilities. Action items: prepare detailed competitor feature comparison matrix and draft multi-year pricing proposals with various discount scenarios.', 'DataDriven Co', 'Demo', 'James Wilson', '2024-01-19 13:30:00', 85000, 'Analytics Pro'),
# ('CONV006', 'Extended technical deep dive with HealthTech Solutions'' IT Security team, Compliance Officer, and System Architects. Four-hour session focused on API infrastructure, data security protocols, and compliance requirements. Team raised specific concerns about HIPAA compliance, data encryption standards, and API rate limiting. Detailed discussion of our security architecture, including: end-to-end encryption, audit logging, and disaster recovery protocols. Client requires extensive documentation on compliance certifications, particularly SOC 2 and HITRUST. Security team performed initial architecture review and requested additional information about: database segregation, backup procedures, and incident response protocols. Follow-up session scheduled with their compliance team next week.', 'HealthTech Solutions', 'Technical Review', 'Rachel Torres', '2024-01-20 15:45:00', 120000, 'Premium Security'),
# ('CONV007', 'Contract review meeting with LegalEase Corp''s General Counsel, Procurement Director, and IT Manager. Detailed analysis of SLA terms, focusing on uptime guarantees and support response times. Legal team requested specific modifications to liability clauses and data handling agreements. Procurement raised questions about payment terms and service credit structure. Key discussion points included: disaster recovery commitments, data retention policies, and exit clause specifications. IT Manager confirmed technical requirements are met pending final security assessment. Agreement reached on most terms, with only SLA modifications remaining for discussion. Legal team to provide revised contract language by end of week. Overall positive session with clear path to closing.', 'LegalEase Corp', 'Negotiation', 'Mike Chen', '2024-01-21 10:00:00', 95000, 'Enterprise Suite'),
# ('CONV008', 'Quarterly business review with GlobalTrade Inc''s current implementation team and potential expansion stakeholders. Current implementation in Finance department showcasing strong adoption rates and 40% improvement in processing times. Discussion focused on expanding solution to Operations and HR departments. Users highlighted positive experiences with customer support and platform stability. Challenges identified in current usage: need for additional custom reports and increased automation in workflow processes. Expansion requirements gathered from Operations Director: inventory management integration, supplier portal access, and enhanced tracking capabilities. HR team interested in recruitment and onboarding workflow automation. Next steps: prepare department-specific implementation plans and ROI analysis for expansion.', 'GlobalTrade Inc', 'Expansion', 'James Wilson', '2024-01-22 14:20:00', 45000, 'Basic Package'),
# ('CONV009', 'Emergency planning session with FastTrack Ltd''s Executive team and Project Managers. Critical need for rapid implementation due to current system failure. Team willing to pay premium for expedited deployment and dedicated support team. Detailed discussion of accelerated implementation timeline and resource requirements. Key requirements: minimal disruption to operations, phased data migration, and emergency support protocols. Technical team confident in meeting aggressive timeline with additional resources. Executive sponsor emphasized importance of going live within 30 days. Immediate next steps: finalize expedited implementation plan, assign dedicated support team, and begin emergency onboarding procedures. Team to reconvene daily for progress updates.', 'FastTrack Ltd', 'Closing', 'Sarah Johnson', '2024-01-23 16:30:00', 180000, 'Premium Security'),
# ('CONV010', 'Quarterly strategic review with UpgradeNow Corp''s Department Heads and Analytics team. Current implementation meeting basic needs but team requiring more sophisticated analytics capabilities. Deep dive into current usage patterns revealed opportunities for workflow optimization and advanced reporting needs. Users expressed strong satisfaction with platform stability and basic features, but requiring enhanced data visualization and predictive analytics capabilities. Analytics team presented specific requirements: custom dashboard creation, advanced data modeling tools, and integrated BI features. Discussion about upgrade path from current package to Analytics Pro tier. ROI analysis presented showing potential 60% improvement in reporting efficiency. Team to present upgrade proposal to executive committee next month.', 'UpgradeNow Corp', 'Expansion', 'Rachel Torres', '2024-01-24 11:45:00', 65000, 'Analytics Pro');
# -- Now, let's insert corresponding data into sales_metrics
# INSERT INTO sales_metrics
# (deal_id, customer_name, deal_value, close_date, sales_stage, win_status, sales_rep, product_line)
# VALUES
# ('DEAL001', 'TechCorp Inc', 75000, '2024-02-15', 'Closed', true, 'Sarah Johnson', 'Enterprise Suite'),
# ('DEAL002', 'SmallBiz Solutions', 25000, '2024-02-01', 'Lost', false, 'Mike Chen', 'Basic Package'),
# ('DEAL003', 'SecureBank Ltd', 150000, '2024-01-30', 'Closed', true, 'Rachel Torres', 'Premium Security'),
# ('DEAL004', 'GrowthStart Up', 100000, '2024-02-10', 'Pending', false, 'Sarah Johnson', 'Enterprise Suite'),
# ('DEAL005', 'DataDriven Co', 85000, '2024-02-05', 'Closed', true, 'James Wilson', 'Analytics Pro'),
# ('DEAL006', 'HealthTech Solutions', 120000, '2024-02-20', 'Pending', false, 'Rachel Torres', 'Premium Security'),
# ('DEAL007', 'LegalEase Corp', 95000, '2024-01-25', 'Closed', true, 'Mike Chen', 'Enterprise Suite'),
# ('DEAL008', 'GlobalTrade Inc', 45000, '2024-02-08', 'Closed', true, 'James Wilson', 'Basic Package'),
# ('DEAL009', 'FastTrack Ltd', 180000, '2024-02-12', 'Closed', true, 'Sarah Johnson', 'Premium Security'),
# ('DEAL010', 'UpgradeNow Corp', 65000, '2024-02-18', 'Pending', false, 'Rachel Torres', 'Analytics Pro');
# -- Enable change tracking
# ALTER TABLE sales_conversations SET CHANGE_TRACKING = TRUE;
# -- Create the search service
# CREATE OR REPLACE CORTEX SEARCH SERVICE sales_conversation_search
# ON transcript_text
# ATTRIBUTES customer_name, deal_stage, sales_rep, product_line, conversation_date, deal_value
# WAREHOUSE = sales_intelligence_wh
# TARGET_LAG = '1 minute'
# AS (
# SELECT
# conversation_id,
# transcript_text,
# customer_name,
# deal_stage,
# sales_rep,
# conversation_date,
# deal_value,
# product_line
# FROM sales_conversations
# WHERE conversation_date >= '2024-01-01' -- Fixed date instead of CURRENT_TIMESTAMP
# );
# CREATE OR REPLACE STAGE models
# DIRECTORY = (ENABLE = TRUE);
# -- Create database and schema
# CREATE OR REPLACE DATABASE sales_intelligence;
# CREATE OR REPLACE SCHEMA sales_intelligence.data;
# CREATE OR REPLACE WAREHOUSE sales_intelligence_wh;
# USE DATABASE sales_intelligence;
# USE SCHEMA data;
# -- Create tables for sales data
# CREATE TABLE sales_conversations (
# conversation_id VARCHAR,
# transcript_text TEXT,
# customer_name VARCHAR,
# deal_stage VARCHAR,
# sales_rep VARCHAR,
# conversation_date TIMESTAMP,
# deal_value FLOAT,
# product_line VARCHAR
# );
# CREATE TABLE sales_metrics (
# deal_id VARCHAR,
# customer_name VARCHAR,
# deal_value FLOAT,
# close_date DATE,
# sales_stage VARCHAR,
# win_status BOOLEAN,
# sales_rep VARCHAR,
# product_line VARCHAR
# );
# -- First, let's insert data into sales_conversations
# INSERT INTO sales_conversations
# (conversation_id, transcript_text, customer_name, deal_stage, sales_rep, conversation_date, deal_value, product_line)
# VALUES
# ('CONV001', 'Initial discovery call with TechCorp Inc''s IT Director and Solutions Architect. Client showed strong interest in our enterprise solution features, particularly the automated workflow capabilities. The main discussion centered around integration timeline and complexity. They currently use Legacy System X for their core operations and expressed concerns about potential disruption during migration. The team asked detailed questions about API compatibility and data migration tools.
# Action items include providing a detailed integration timeline document, scheduling a technical deep-dive with their infrastructure team, and sharing case studies of similar Legacy System X migrations. The client mentioned a Q2 budget allocation for digital transformation initiatives. Overall, it was a positive engagement with clear next steps.', 'TechCorp Inc', 'Discovery', 'Sarah Johnson', '2024-01-15 10:30:00', 75000, 'Enterprise Suite'),
# ('CONV002', 'Follow-up call with SmallBiz Solutions'' Operations Manager and Finance Director. The primary focus was on pricing structure and ROI timeline. They compared our Basic Package pricing with Competitor Y''s small business offering. Key discussion points included monthly vs. annual billing options, user license limitations, and potential cost savings from process automation.
# The client requested a detailed ROI analysis focusing on time saved in daily operations, resource allocation improvements, and projected efficiency gains. Budget constraints were clearly communicated, with a maximum budget of $30K for this year. They showed interest in starting with the basic package with room for a potential upgrade in Q4. Next steps include providing a competitive analysis and a customized ROI calculator by next week.', 'SmallBiz Solutions', 'Negotiation', 'Mike Chen', '2024-01-16 14:45:00', 25000, 'Basic Package'),
# ('CONV003', 'Strategy session with SecureBank Ltd''s CISO and Security Operations team. Extremely positive 90-minute deep dive into our Premium Security package. Customer emphasized immediate need for implementation due to recent industry compliance updates. Our advanced security features, especially multi-factor authentication and encryption protocols, were identified as perfect fits for their requirements. Technical team was particularly impressed with our zero-trust architecture approach and real-time threat monitoring capabilities. They''ve already secured budget approval and have executive buy-in. Compliance documentation is ready for review. Action items include: finalizing implementation timeline, scheduling security audit, and preparing necessary documentation for their risk assessment team. Client ready to move forward with contract discussions.', 'SecureBank Ltd', 'Closing', 'Rachel Torres', '2024-01-17 11:20:00', 150000, 'Premium Security'),
# ('CONV004', 'Comprehensive discovery call with GrowthStart Up''s CTO and Department Heads. Team of 500+ employees across 3 continents discussed current challenges with their existing solution. Major pain points identified: system crashes during peak usage, limited cross-department reporting capabilities, and poor scalability for remote teams. Deep dive into their current workflow revealed bottlenecks in data sharing and collaboration. Technical requirements gathered for each department. Platform demo focused on scalability features and global team management capabilities. Client particularly interested in our API ecosystem and custom reporting engine. Next steps: schedule department-specific workflow analysis and prepare detailed platform migration plan.', 'GrowthStart Up', 'Discovery', 'Sarah Johnson', '2024-01-18 09:15:00', 100000, 'Enterprise Suite'),
# ('CONV005', 'In-depth demo session with DataDriven Co''s Analytics team and Business Intelligence managers. Showcase focused on advanced analytics capabilities, custom dashboard creation, and real-time data processing features. Team was particularly impressed with our machine learning integration and predictive analytics models. Competitor comparison requested specifically against Market Leader Z and Innovative Start-up X. Price point falls within their allocated budget range, but team expressed interest in multi-year commitment with corresponding discount structure. Technical questions centered around data warehouse integration and custom visualization capabilities. Action items: prepare detailed competitor feature comparison matrix and draft multi-year pricing proposals with various discount scenarios.', 'DataDriven Co', 'Demo', 'James Wilson', '2024-01-19 13:30:00', 85000, 'Analytics Pro'),
# ('CONV006', 'Extended technical deep dive with HealthTech Solutions'' IT Security team, Compliance Officer, and System Architects. Four-hour session focused on API infrastructure, data security protocols, and compliance requirements. Team raised specific concerns about HIPAA compliance, data encryption standards, and API rate limiting. Detailed discussion of our security architecture, including: end-to-end encryption, audit logging, and disaster recovery protocols. Client requires extensive documentation on compliance certifications, particularly SOC 2 and HITRUST. Security team performed initial architecture review and requested additional information about: database segregation, backup procedures, and incident response protocols. Follow-up session scheduled with their compliance team next week.', 'HealthTech Solutions', 'Technical Review', 'Rachel Torres', '2024-01-20 15:45:00', 120000, 'Premium Security'),
# ('CONV007', 'Contract review meeting with LegalEase Corp''s General Counsel, Procurement Director, and IT Manager. Detailed analysis of SLA terms, focusing on uptime guarantees and support response times. Legal team requested specific modifications to liability clauses and data handling agreements. Procurement raised questions about payment terms and service credit structure. Key discussion points included: disaster recovery commitments, data retention policies, and exit clause specifications. IT Manager confirmed technical requirements are met pending final security assessment. Agreement reached on most terms, with only SLA modifications remaining for discussion. Legal team to provide revised contract language by end of week. Overall positive session with clear path to closing.', 'LegalEase Corp', 'Negotiation', 'Mike Chen', '2024-01-21 10:00:00', 95000, 'Enterprise Suite'),
# ('CONV008', 'Quarterly business review with GlobalTrade Inc''s current implementation team and potential expansion stakeholders. Current implementation in Finance department showcasing strong adoption rates and 40% improvement in processing times. Discussion focused on expanding solution to Operations and HR departments. Users highlighted positive experiences with customer support and platform stability. Challenges identified in current usage: need for additional custom reports and increased automation in workflow processes. Expansion requirements gathered from Operations Director: inventory management integration, supplier portal access, and enhanced tracking capabilities. HR team interested in recruitment and onboarding workflow automation. Next steps: prepare department-specific implementation plans and ROI analysis for expansion.', 'GlobalTrade Inc', 'Expansion', 'James Wilson', '2024-01-22 14:20:00', 45000, 'Basic Package'),
# ('CONV009', 'Emergency planning session with FastTrack Ltd''s Executive team and Project Managers. Critical need for rapid implementation due to current system failure. Team willing to pay premium for expedited deployment and dedicated support team. Detailed discussion of accelerated implementation timeline and resource requirements. Key requirements: minimal disruption to operations, phased data migration, and emergency support protocols. Technical team confident in meeting aggressive timeline with additional resources. Executive sponsor emphasized importance of going live within 30 days. Immediate next steps: finalize expedited implementation plan, assign dedicated support team, and begin emergency onboarding procedures. Team to reconvene daily for progress updates.', 'FastTrack Ltd', 'Closing', 'Sarah Johnson', '2024-01-23 16:30:00', 180000, 'Premium Security'),
# ('CONV010', 'Quarterly strategic review with UpgradeNow Corp''s Department Heads and Analytics team. Current implementation meeting basic needs but team requiring more sophisticated analytics capabilities. Deep dive into current usage patterns revealed opportunities for workflow optimization and advanced reporting needs. Users expressed strong satisfaction with platform stability and basic features, but requiring enhanced data visualization and predictive analytics capabilities. Analytics team presented specific requirements: custom dashboard creation, advanced data modeling tools, and integrated BI features. Discussion about upgrade path from current package to Analytics Pro tier. ROI analysis presented showing potential 60% improvement in reporting efficiency. Team to present upgrade proposal to executive committee next month.', 'UpgradeNow Corp', 'Expansion', 'Rachel Torres', '2024-01-24 11:45:00', 65000, 'Analytics Pro');
# -- Now, let's insert corresponding data into sales_metrics
# INSERT INTO sales_metrics
# (deal_id, customer_name, deal_value, close_date, sales_stage, win_status, sales_rep, product_line)
# VALUES
# ('DEAL001', 'TechCorp Inc', 75000, '2024-02-15', 'Closed', true, 'Sarah Johnson', 'Enterprise Suite'),
# ('DEAL002', 'SmallBiz Solutions', 25000, '2024-02-01', 'Lost', false, 'Mike Chen', 'Basic Package'),
# ('DEAL003', 'SecureBank Ltd', 150000, '2024-01-30', 'Closed', true, 'Rachel Torres', 'Premium Security'),
# ('DEAL004', 'GrowthStart Up', 100000, '2024-02-10', 'Pending', false, 'Sarah Johnson', 'Enterprise Suite'),
# ('DEAL005', 'DataDriven Co', 85000, '2024-02-05', 'Closed', true, 'James Wilson', 'Analytics Pro'),
# ('DEAL006', 'HealthTech Solutions', 120000, '2024-02-20', 'Pending', false, 'Rachel Torres', 'Premium Security'),
# ('DEAL007', 'LegalEase Corp', 95000, '2024-01-25', 'Closed', true, 'Mike Chen', 'Enterprise Suite'),
# ('DEAL008', 'GlobalTrade Inc', 45000, '2024-02-08', 'Closed', true, 'James Wilson', 'Basic Package'),
# ('DEAL009', 'FastTrack Ltd', 180000, '2024-02-12', 'Closed', true, 'Sarah Johnson', 'Premium Security'),
# ('DEAL010', 'UpgradeNow Corp', 65000, '2024-02-18', 'Pending', false, 'Rachel Torres', 'Analytics Pro');
# -- Enable change tracking
# ALTER TABLE sales_conversations SET CHANGE_TRACKING = TRUE;
# -- Create the search service
# CREATE OR REPLACE CORTEX SEARCH SERVICE sales_conversation_search
# ON transcript_text
# ATTRIBUTES customer_name, deal_stage, sales_rep, product_line, conversation_date, deal_value
# WAREHOUSE = sales_intelligence_wh
# TARGET_LAG = '1 minute'
# AS (
# SELECT
# conversation_id,
# transcript_text,
# customer_name,
# deal_stage,
# sales_rep,
# conversation_date,
# deal_value,
# product_line
# FROM sales_conversations
# WHERE conversation_date >= '2024-01-01' -- Fixed date instead of CURRENT_TIMESTAMP
# );
# CREATE OR REPLACE STAGE models
# DIRECTORY = (ENABLE = TRUE);
Upload the semantic model:¶
Download sales_metrics_model.yaml(NOTE: Do NOT right-click to download.) Navigate to Data » Databases » SALES_INTELLIGENCE » DATA » Stages » MODELS Click "+ Files" in the top right Browse and select sales_metrics_model.yaml file Click "Upload"
Create Network Policy and PAT¶
In [ ]:
Copied!
# /*------------------------------------------------------------*/
# /* 1) As ACCOUNTADMIN: create a place to store the rule */
# /*------------------------------------------------------------*/
# USE ROLE ACCOUNTADMIN;
# CREATE OR REPLACE DATABASE securitydb;
# CREATE OR REPLACE SCHEMA securitydb.myrules;
# /* Give SECURITYADMIN everything it needs in that schema */
# GRANT USAGE ON DATABASE securitydb TO ROLE SECURITYADMIN;
# GRANT USAGE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
# GRANT CREATE NETWORK RULE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
# /*------------------------------------------------------------*/
# /* 2) Switch to SECURITYADMIN and create the rule */
# /*------------------------------------------------------------*/
# USE ROLE SECURITYADMIN;
# USE DATABASE securitydb;
# USE SCHEMA myrules;
# CREATE OR REPLACE NETWORK RULE allow_all_ingress
# MODE = INGRESS -- protects service (and internal stage if enabled)
# TYPE = IPV4
# VALUE_LIST = ('0.0.0.0/0') -- “everything”
# COMMENT = 'Allows all IPv4 traffic (public + private)';
# /*------------------------------------------------------------*/
# /* 4) Create network policy with the rule */
# /*------------------------------------------------------------*/
# CREATE OR REPLACE NETWORK POLICY open_access_policy
# ALLOWED_NETWORK_RULE_LIST = ('allow_all_ingress')
# COMMENT = 'Permits all IPv4 traffic – used for PAT testing';
# /*------------------------------------------------------------*/
# /* 3) Attach the policy to the account */
# /*------------------------------------------------------------*/
# ALTER ACCOUNT SET NETWORK_POLICY = open_access_policy;
# ALTER USER ADD PROGRAMMATIC ACCESS TOKEN agents_token;
# /*------------------------------------------------------------*/
# /* 1) As ACCOUNTADMIN: create a place to store the rule */
# /*------------------------------------------------------------*/
# USE ROLE ACCOUNTADMIN;
# CREATE OR REPLACE DATABASE securitydb;
# CREATE OR REPLACE SCHEMA securitydb.myrules;
# /* Give SECURITYADMIN everything it needs in that schema */
# GRANT USAGE ON DATABASE securitydb TO ROLE SECURITYADMIN;
# GRANT USAGE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
# GRANT CREATE NETWORK RULE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
# /*------------------------------------------------------------*/
# /* 2) Switch to SECURITYADMIN and create the rule */
# /*------------------------------------------------------------*/
# USE ROLE SECURITYADMIN;
# USE DATABASE securitydb;
# USE SCHEMA myrules;
# CREATE OR REPLACE NETWORK RULE allow_all_ingress
# MODE = INGRESS -- protects service (and internal stage if enabled)
# TYPE = IPV4
# VALUE_LIST = ('0.0.0.0/0') -- “everything”
# COMMENT = 'Allows all IPv4 traffic (public + private)';
# /*------------------------------------------------------------*/
# /* 4) Create network policy with the rule */
# /*------------------------------------------------------------*/
# CREATE OR REPLACE NETWORK POLICY open_access_policy
# ALLOWED_NETWORK_RULE_LIST = ('allow_all_ingress')
# COMMENT = 'Permits all IPv4 traffic – used for PAT testing';
# /*------------------------------------------------------------*/
# /* 3) Attach the policy to the account */
# /*------------------------------------------------------------*/
# ALTER ACCOUNT SET NETWORK_POLICY = open_access_policy;
# ALTER USER ADD PROGRAMMATIC ACCESS TOKEN agents_token;
Import libraries¶
In [ ]:
Copied!
import os
from typing import List, Literal
import uuid
from typing import Type
from typing import Annotated
from typing import Any
import pandas as pd
from snowflake.snowpark import Session
from pydantic import BaseModel, PrivateAttr
from snowflake.core import Root
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from snowflake.snowpark import Session
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.tools import StructuredTool
from langchain_core.tools import Tool
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from pydantic import BaseModel
from snowflake.snowpark import Session
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.otel.instrument import instrument
from trulens.core.run import Run
from trulens.core.run import RunConfig
from trulens.otel.semconv.trace import BASE_SCOPE
from trulens.otel.semconv.trace import SpanAttributes
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
import os
from typing import List, Literal
import uuid
from typing import Type
from typing import Annotated
from typing import Any
import pandas as pd
from snowflake.snowpark import Session
from pydantic import BaseModel, PrivateAttr
from snowflake.core import Root
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from snowflake.snowpark import Session
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.tools import StructuredTool
from langchain_core.tools import Tool
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from pydantic import BaseModel
from snowflake.snowpark import Session
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.otel.instrument import instrument
from trulens.core.run import Run
from trulens.core.run import RunConfig
from trulens.otel.semconv.trace import BASE_SCOPE
from trulens.otel.semconv.trace import SpanAttributes
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
Create TruLens/Snowflake Connection¶
In [ ]:
Copied!
# Snowflake account for trulens
snowflake_connection_parameters = {
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_USER_PASSWORD"],
"database": os.environ["SNOWFLAKE_DATABASE"],
"schema": os.environ["SNOWFLAKE_SCHEMA"],
"role": os.environ["SNOWFLAKE_ROLE"],
"warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
}
snowpark_session_trulens = Session.builder.configs(
snowflake_connection_parameters
).create()
trulens_sf_connector = SnowflakeConnector(
snowpark_session=snowpark_session_trulens
)
# Snowflake account for trulens
snowflake_connection_parameters = {
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_USER_PASSWORD"],
"database": os.environ["SNOWFLAKE_DATABASE"],
"schema": os.environ["SNOWFLAKE_SCHEMA"],
"role": os.environ["SNOWFLAKE_ROLE"],
"warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
}
snowpark_session_trulens = Session.builder.configs(
snowflake_connection_parameters
).create()
trulens_sf_connector = SnowflakeConnector(
snowpark_session=snowpark_session_trulens
)
Initialize custom state¶
In [ ]:
Copied!
from typing import Literal, Optional, List, Dict, Any
from langgraph.graph import MessagesState
# Custom State class with specific keys
class State(MessagesState):
plan: Optional[List[Dict[int, Dict[str, Any]]]]
user_query: Optional[str]
current_step: int
replan_flag: Optional[bool]
last_reason: Optional[str]
replan_attempts: Optional[Dict[int, Dict[int, int]]]
agent_query: Optional[str]
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
MAX_REPLANS = 3
from typing import Literal, Optional, List, Dict, Any
from langgraph.graph import MessagesState
# Custom State class with specific keys
class State(MessagesState):
plan: Optional[List[Dict[int, Dict[str, Any]]]]
user_query: Optional[str]
current_step: int
replan_flag: Optional[bool]
last_reason: Optional[str]
replan_attempts: Optional[Dict[int, Dict[int, int]]]
agent_query: Optional[str]
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
MAX_REPLANS = 3
Create planner¶
In [ ]:
Copied!
from typing import Literal, Dict, Any
import json
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage, BaseMessage
# ── LLMs ────────────────────────────────────────────────────────────────
planner_llm = ChatOpenAI(
model_name="gpt-4o",
response_format={"type": "json_object"},
)
# same for the reasoning/orchestrator LLM if it parses JSON too
reasoning_llm = ChatOpenAI(
model_name="o3-mini",
response_format={"type": "json_object"},
temperature=1
)
# ── PLANNER PROMPT ──────────────────────────────────────────────────────
def plan_prompt(state: Dict[str, Any]) -> HumanMessage:
"""
Build the prompt that instructs the LLM to return a high‑level plan.
"""
replan_flag = state.get("replan_flag", False)
user_query = state.get("user_query", state["messages"][0].content)
prior_plan = state.get("plan", [])[-1] if state.get("plan") else {}
replan_reason = state.get("last_reason", "")
prompt = f"""
You are the **Planner** in a multi‑agent system. Break the user's request
into a sequence of numbered steps (1, 2, 3, …). **There is no hard limit on
step count** as long as the plan is concise and each step has a clear goal.
You may decompose the user's query into sub-queries, each of which is a
separate step. For example, if the user's query is "What were the key
action items in the last quarter, and what was a recent news story for
each of them?", you may break it into steps:
1. Fetch the key action items in the last quarter.
2. Fetch a recent news story for the first action item.
3. Fetch a recent news story for the second action item.
4. Fetch a recent news story for the last action item.
Here is a list of available agents you can call upon to execute the tasks in your plan. You may call only one agent per step.
• `web_researcher` – fetch public data via Tavily
• `cortex_researcher` – fetch private/company data via Snowflake Cortex Agents
• `chart_generator` – build visualizations from structured data
• `synthesizer` – write a short prose summary of the findings.
Return **ONLY** valid JSON (no markdown, no explanations) in this form:
{{
"1": {{
"agent": "web_researcher | cortex_researcher | chart_generator | synthesizer",
"action": "string",
"goal": "string",
"pre_conditions": ["string", ...],
"post_conditions": ["string", ...]
}},
"2": {{ ... }},
"3": {{ ... }}
}}
Guidelines:
- Use `cortex_researcher` when Snowflake/Cortex/private data is required.
- Use `web_researcher` for public‑web facts.
- **Include `chart_generator` _only_ if the user’s wording requires a
visualisation** (keywords: chart, graph, plot, visualise, bar‑chart,
line‑chart, histogram, etc.). If included, `chart_generator` must be
the final step after required data is gathered. Visualizations should
include all of the data from the previous steps that is reasonable for
the chart type.
– Otherwise use `synthesizer` as the final step, and be sure to include
all of the data from the previous steps.
"""
if replan_flag:
prompt += f"""
The previous plan needs revision because: {replan_reason}
Previous plan:
{json.dumps(prior_plan, indent=2)}
When replanning:
- Identify the failed or incomplete step and **rewrite** that step (keep its number).
- Leave other valid steps unchanged.
"""
else:
prompt += "\nGenerate a new plan from scratch."
prompt += f'\nUser query: "{user_query}"'
return HumanMessage(content=prompt)
# ── PLANNER NODE ────────────────────────────────────────────────────────
@instrument()
def planner_node(state: State) \
-> "Command[Literal['orchestrator']]":
"""
Runs the planning LLM and stores the resulting plan in state.
"""
# 1. Invoke LLM with the planner prompt
llm_reply = planner_llm.invoke([plan_prompt(state)])
# 2. Validate JSON
try:
parsed_plan = json.loads(llm_reply.content)
except json.JSONDecodeError:
raise ValueError(f"Planner returned invalid JSON:\n{llm_reply.content}")
# 3. Merge into state
existing_plans = state.get("plan", [])
replan = state.get("replan_flag", False)
updated_plan = existing_plans + [parsed_plan] if replan else [parsed_plan]
return Command(
update={
"plan": updated_plan,
"messages": [HumanMessage(
content=llm_reply.content,
name="replan" if replan else "initial_plan"
)],
"user_query": state.get("user_query",
state["messages"][0].content),
"current_step": 1 if not replan else state["current_step"],
"replan_flag": False, # reset – we just replanned
"last_reason": "",
},
goto="orchestrator",
)
from typing import Literal, Dict, Any
import json
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage, BaseMessage
# ── LLMs ────────────────────────────────────────────────────────────────
planner_llm = ChatOpenAI(
model_name="gpt-4o",
response_format={"type": "json_object"},
)
# same for the reasoning/orchestrator LLM if it parses JSON too
reasoning_llm = ChatOpenAI(
model_name="o3-mini",
response_format={"type": "json_object"},
temperature=1
)
# ── PLANNER PROMPT ──────────────────────────────────────────────────────
def plan_prompt(state: Dict[str, Any]) -> HumanMessage:
"""
Build the prompt that instructs the LLM to return a high‑level plan.
"""
replan_flag = state.get("replan_flag", False)
user_query = state.get("user_query", state["messages"][0].content)
prior_plan = state.get("plan", [])[-1] if state.get("plan") else {}
replan_reason = state.get("last_reason", "")
prompt = f"""
You are the **Planner** in a multi‑agent system. Break the user's request
into a sequence of numbered steps (1, 2, 3, …). **There is no hard limit on
step count** as long as the plan is concise and each step has a clear goal.
You may decompose the user's query into sub-queries, each of which is a
separate step. For example, if the user's query is "What were the key
action items in the last quarter, and what was a recent news story for
each of them?", you may break it into steps:
1. Fetch the key action items in the last quarter.
2. Fetch a recent news story for the first action item.
3. Fetch a recent news story for the second action item.
4. Fetch a recent news story for the last action item.
Here is a list of available agents you can call upon to execute the tasks in your plan. You may call only one agent per step.
• `web_researcher` – fetch public data via Tavily
• `cortex_researcher` – fetch private/company data via Snowflake Cortex Agents
• `chart_generator` – build visualizations from structured data
• `synthesizer` – write a short prose summary of the findings.
Return **ONLY** valid JSON (no markdown, no explanations) in this form:
{{
"1": {{
"agent": "web_researcher | cortex_researcher | chart_generator | synthesizer",
"action": "string",
"goal": "string",
"pre_conditions": ["string", ...],
"post_conditions": ["string", ...]
}},
"2": {{ ... }},
"3": {{ ... }}
}}
Guidelines:
- Use `cortex_researcher` when Snowflake/Cortex/private data is required.
- Use `web_researcher` for public‑web facts.
- **Include `chart_generator` _only_ if the user’s wording requires a
visualisation** (keywords: chart, graph, plot, visualise, bar‑chart,
line‑chart, histogram, etc.). If included, `chart_generator` must be
the final step after required data is gathered. Visualizations should
include all of the data from the previous steps that is reasonable for
the chart type.
– Otherwise use `synthesizer` as the final step, and be sure to include
all of the data from the previous steps.
"""
if replan_flag:
prompt += f"""
The previous plan needs revision because: {replan_reason}
Previous plan:
{json.dumps(prior_plan, indent=2)}
When replanning:
- Identify the failed or incomplete step and **rewrite** that step (keep its number).
- Leave other valid steps unchanged.
"""
else:
prompt += "\nGenerate a new plan from scratch."
prompt += f'\nUser query: "{user_query}"'
return HumanMessage(content=prompt)
# ── PLANNER NODE ────────────────────────────────────────────────────────
@instrument()
def planner_node(state: State) \
-> "Command[Literal['orchestrator']]":
"""
Runs the planning LLM and stores the resulting plan in state.
"""
# 1. Invoke LLM with the planner prompt
llm_reply = planner_llm.invoke([plan_prompt(state)])
# 2. Validate JSON
try:
parsed_plan = json.loads(llm_reply.content)
except json.JSONDecodeError:
raise ValueError(f"Planner returned invalid JSON:\n{llm_reply.content}")
# 3. Merge into state
existing_plans = state.get("plan", [])
replan = state.get("replan_flag", False)
updated_plan = existing_plans + [parsed_plan] if replan else [parsed_plan]
return Command(
update={
"plan": updated_plan,
"messages": [HumanMessage(
content=llm_reply.content,
name="replan" if replan else "initial_plan"
)],
"user_query": state.get("user_query",
state["messages"][0].content),
"current_step": 1 if not replan else state["current_step"],
"replan_flag": False, # reset – we just replanned
"last_reason": "",
},
goto="orchestrator",
)
Create orchestrator¶
In [ ]:
Copied!
from __future__ import annotations
import json
from typing import Dict, Any, Literal
from langchain.schema import HumanMessage, BaseMessage
# You already have these in your code base:
# - reasoning_llm : ChatOpenAI
# - Command, State : LangGraph types
MAX_REPLANS = 3 # feel free to tweak
# ────────────────────────────────────────────────────────────────────────
# Prompt builder
# ────────────────────────────────────────────────────────────────────────
def orchestrator_prompt(state: Dict[str, Any]) -> HumanMessage:
"""
Build the single‑turn JSON prompt that drives the orchestrator LLM.
"""
step = state.get("current_step", 0)
plan_block = state["plan"][-1][str(step)]
pre_conditions = plan_block["pre_conditions"]
max_replans = MAX_REPLANS
step_replans = state.get("replan_attempts", {}).get(step, {})
attempts = step_replans.get(len(state["plan"]) - 1, 0)
return HumanMessage(
content=f"""
You are the **Orchestrator** in a multi‑agent system with four agents:
`web_researcher`, `cortex_researcher`, `chart_generator`, and `planner`.
**Tasks**
1. Decide if the current plan needs revision. → `"replan_flag": true|false`
2. Decide which agent to run next. → `"goto": "<agent_name>"`
3. Give one‑sentence justification. → `"reason": "<text>"`
4. Write the exact question that the chosen agent should answer
→ "query": "<text>"
**Guidelines**
- Use `"web_researcher"` when *public* info is missing.
- Use `"cortex_researcher"` for *private/Snowflake* data.
- After **{MAX_REPLANS}** failed replans for the same step, move on.
- If you *just replanned* (replan_flag is true) let the assigned agent try before
requesting another replan.
**Inputs**
- User query ..............: {state.get("user_query", "[missing]")}
- Current plan (latest) ...: {state.get("plan")[-1] if state.get("plan") else "[none]"}
- Current step index ......: {step}
- Pre-conditions for step .: {pre_conditions}
- Just replanned flag .....: {state.get("replan_flag")}
Respond **only** with valid JSON (no additional text):
{{
"replan": <true|false>,
"goto": "<web_researcher|cortex_researcher|chart_generator|planner>",
"reason": "<1 sentence>",
"query": "<text>"
}}
### Decide "replan"
1. Review the last few agent messages and consider if the plan is still valid.
2. If the plan is still valid, set `"replan": false`
(run the step or move on).
3. Otherwise, set `"replan": true` **only if**
• the missing information is **not** expected to be produced by any of
the remaining steps **and**
• `attempts < {max_replans}`.
When `attempts == {max_replans}`, skip the step instead of replanning
(`"goto"` the next planned agent).
### Decide `"goto"`
- If `"replan": true` → `"goto": "planner"`.
- Otherwise choose the agent already assigned to this step
(`{plan_block['agent']}`).
### Build `"query"`
Write a clear, standalone instruction for the chosen agent. If the chosen agent
is `web_researcher` or `cortex_researcher`, the query should be a standalone search query.
Context you can rely on
- User query ..............: {state.get("user_query")}
- Current step index ......: {step}
- Current plan step .......: {plan_block}
- Pre-conditions for step .: {pre_conditions}
- Just‑replanned flag .....: {state.get("replan_flag")}
- Previous messages including research evaluations.......: {state.get("messages")[-4:]}
Respond **only** with JSON, no extra text.
"""
)
# ────────────────────────────────────────────────────────────────────────
# Orchestrator node
# ────────────────────────────────────────────────────────────────────────
@instrument()
def orchestrator_node(
state: State,
) -> Command[
Literal["web_researcher", "cortex_researcher", "chart_generator", "synthesizer","planner", END]
]:
"""
Central decision‑maker.
1. Builds a prompt from `state`, calls the LLM, and parses the JSON reply.
2. Applies guard‑rails to prevent infinite replans and out‑of‑range steps.
3. Returns a `Command` with state updates and the next node to execute.
"""
# ── 0. No plan yet? -> ask the planner for one ──────────────────────
if not state.get("plan"):
return Command(
goto="planner",
update={
"replan_flag": True,
"last_reason": "No current plan is available; create one.",
"current_step": 1,
},
)
latest_plan: Dict[str, Any] = state["plan"][-1]
step: int = state.get("current_step", 1)
# ── 1. Finished all steps? -> go to END ─────────────────────────
if str(step) not in latest_plan:
return Command(goto=END, update={})
# ── 2. Build prompt & call LLM ────────────────────────────────────
llm_reply = reasoning_llm.invoke([orchestrator_prompt(state)])
try:
parsed = json.loads(llm_reply.content)
replan: bool = parsed["replan"]
goto: str = parsed["goto"]
reason: str = parsed["reason"]
query: str = parsed["query"]
except Exception as exc:
raise ValueError(
f"Invalid orchestrator JSON:\n{llm_reply.content}"
) from exc
# ── 3. Track how many times we've replanned this (step, plan‑ver) ──
plan_version = len(state["plan"]) - 1
replans: Dict[int, Dict[int, int]] = state.get("replan_attempts", {})
step_replans = replans.get(step, {}).get(plan_version, 0)
# ── 4. Assemble updates common to every exit path ─────────────────
updates: Dict[str, Any] = {
"messages": [HumanMessage(content=llm_reply.content, name="orchestrator")],
"last_reason": reason,
"agent_query": query,
}
# ── 5. If we *just* replanned, run the agent without replanning again
if state.get("replan_flag"):
updates["replan_flag"] = False
updates["current_step"] = step # stay on same step
assigned_agent = latest_plan[str(step)]["agent"]
return Command(update=updates, goto=assigned_agent)
# ── 6. Too many replans for this step -> skip to next step ─────────
if replan and step_replans >= MAX_REPLANS:
updates["replan_flag"] = False
updates["current_step"] = step + 1
next_agent = latest_plan.get(str(step + 1), {}).get("agent", END)
return Command(update=updates, goto=next_agent)
# ── 7. Normal replan request ──────────────────────────────────────
if replan:
replans.setdefault(step, {})[plan_version] = step_replans + 1
updates.update(
{
"replan_attempts": replans,
"replan_flag": True,
"current_step": step, # retry same step after replanning
}
)
return Command(update=updates, goto="planner")
# ── 8. Happy path: run the chosen agent ───────────────────────────
updates["replan_flag"] = False
# Increment step only if we’re following the planned agent
planned_agent = latest_plan[str(step)]["agent"]
if goto == planned_agent:
updates["current_step"] = step + 1
else:
updates["current_step"] = step # planner or evaluator may override
return Command(update=updates, goto=goto)
from __future__ import annotations
import json
from typing import Dict, Any, Literal
from langchain.schema import HumanMessage, BaseMessage
# You already have these in your code base:
# - reasoning_llm : ChatOpenAI
# - Command, State : LangGraph types
MAX_REPLANS = 3 # feel free to tweak
# ────────────────────────────────────────────────────────────────────────
# Prompt builder
# ────────────────────────────────────────────────────────────────────────
def orchestrator_prompt(state: Dict[str, Any]) -> HumanMessage:
"""
Build the single‑turn JSON prompt that drives the orchestrator LLM.
"""
step = state.get("current_step", 0)
plan_block = state["plan"][-1][str(step)]
pre_conditions = plan_block["pre_conditions"]
max_replans = MAX_REPLANS
step_replans = state.get("replan_attempts", {}).get(step, {})
attempts = step_replans.get(len(state["plan"]) - 1, 0)
return HumanMessage(
content=f"""
You are the **Orchestrator** in a multi‑agent system with four agents:
`web_researcher`, `cortex_researcher`, `chart_generator`, and `planner`.
**Tasks**
1. Decide if the current plan needs revision. → `"replan_flag": true|false`
2. Decide which agent to run next. → `"goto": ""`
3. Give one‑sentence justification. → `"reason": ""`
4. Write the exact question that the chosen agent should answer
→ "query": ""
**Guidelines**
- Use `"web_researcher"` when *public* info is missing.
- Use `"cortex_researcher"` for *private/Snowflake* data.
- After **{MAX_REPLANS}** failed replans for the same step, move on.
- If you *just replanned* (replan_flag is true) let the assigned agent try before
requesting another replan.
**Inputs**
- User query ..............: {state.get("user_query", "[missing]")}
- Current plan (latest) ...: {state.get("plan")[-1] if state.get("plan") else "[none]"}
- Current step index ......: {step}
- Pre-conditions for step .: {pre_conditions}
- Just replanned flag .....: {state.get("replan_flag")}
Respond **only** with valid JSON (no additional text):
{{
"replan": ,
"goto": "",
"reason": "<1 sentence>",
"query": ""
}}
### Decide "replan"
1. Review the last few agent messages and consider if the plan is still valid.
2. If the plan is still valid, set `"replan": false`
(run the step or move on).
3. Otherwise, set `"replan": true` **only if**
• the missing information is **not** expected to be produced by any of
the remaining steps **and**
• `attempts < {max_replans}`.
When `attempts == {max_replans}`, skip the step instead of replanning
(`"goto"` the next planned agent).
### Decide `"goto"`
- If `"replan": true` → `"goto": "planner"`.
- Otherwise choose the agent already assigned to this step
(`{plan_block['agent']}`).
### Build `"query"`
Write a clear, standalone instruction for the chosen agent. If the chosen agent
is `web_researcher` or `cortex_researcher`, the query should be a standalone search query.
Context you can rely on
- User query ..............: {state.get("user_query")}
- Current step index ......: {step}
- Current plan step .......: {plan_block}
- Pre-conditions for step .: {pre_conditions}
- Just‑replanned flag .....: {state.get("replan_flag")}
- Previous messages including research evaluations.......: {state.get("messages")[-4:]}
Respond **only** with JSON, no extra text.
"""
)
# ────────────────────────────────────────────────────────────────────────
# Orchestrator node
# ────────────────────────────────────────────────────────────────────────
@instrument()
def orchestrator_node(
state: State,
) -> Command[
Literal["web_researcher", "cortex_researcher", "chart_generator", "synthesizer","planner", END]
]:
"""
Central decision‑maker.
1. Builds a prompt from `state`, calls the LLM, and parses the JSON reply.
2. Applies guard‑rails to prevent infinite replans and out‑of‑range steps.
3. Returns a `Command` with state updates and the next node to execute.
"""
# ── 0. No plan yet? -> ask the planner for one ──────────────────────
if not state.get("plan"):
return Command(
goto="planner",
update={
"replan_flag": True,
"last_reason": "No current plan is available; create one.",
"current_step": 1,
},
)
latest_plan: Dict[str, Any] = state["plan"][-1]
step: int = state.get("current_step", 1)
# ── 1. Finished all steps? -> go to END ─────────────────────────
if str(step) not in latest_plan:
return Command(goto=END, update={})
# ── 2. Build prompt & call LLM ────────────────────────────────────
llm_reply = reasoning_llm.invoke([orchestrator_prompt(state)])
try:
parsed = json.loads(llm_reply.content)
replan: bool = parsed["replan"]
goto: str = parsed["goto"]
reason: str = parsed["reason"]
query: str = parsed["query"]
except Exception as exc:
raise ValueError(
f"Invalid orchestrator JSON:\n{llm_reply.content}"
) from exc
# ── 3. Track how many times we've replanned this (step, plan‑ver) ──
plan_version = len(state["plan"]) - 1
replans: Dict[int, Dict[int, int]] = state.get("replan_attempts", {})
step_replans = replans.get(step, {}).get(plan_version, 0)
# ── 4. Assemble updates common to every exit path ─────────────────
updates: Dict[str, Any] = {
"messages": [HumanMessage(content=llm_reply.content, name="orchestrator")],
"last_reason": reason,
"agent_query": query,
}
# ── 5. If we *just* replanned, run the agent without replanning again
if state.get("replan_flag"):
updates["replan_flag"] = False
updates["current_step"] = step # stay on same step
assigned_agent = latest_plan[str(step)]["agent"]
return Command(update=updates, goto=assigned_agent)
# ── 6. Too many replans for this step -> skip to next step ─────────
if replan and step_replans >= MAX_REPLANS:
updates["replan_flag"] = False
updates["current_step"] = step + 1
next_agent = latest_plan.get(str(step + 1), {}).get("agent", END)
return Command(update=updates, goto=next_agent)
# ── 7. Normal replan request ──────────────────────────────────────
if replan:
replans.setdefault(step, {})[plan_version] = step_replans + 1
updates.update(
{
"replan_attempts": replans,
"replan_flag": True,
"current_step": step, # retry same step after replanning
}
)
return Command(update=updates, goto="planner")
# ── 8. Happy path: run the chosen agent ───────────────────────────
updates["replan_flag"] = False
# Increment step only if we’re following the planned agent
planned_agent = latest_plan[str(step)]["agent"]
if goto == planned_agent:
updates["current_step"] = step + 1
else:
updates["current_step"] = step # planner or evaluator may override
return Command(update=updates, goto=goto)
Create agent system prompt¶
In [ ]:
Copied!
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
Initialize Cortex Agent for Doc Search + SQL¶
In [ ]:
Copied!
class CortexAgentArgs(BaseModel):
query: str
class CortexAgentTool(StructuredTool):
name: str = "CortexAgent"
description: str = "answers questions using sales conversations and metrics"
args_schema: Type[CortexAgentArgs] = CortexAgentArgs
# Pydantic-compatible private attributes (not validated or required in __init__)
_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()
def __init__(self, session: Session):
# initialize parent class without passing custom fields
super().__init__()
self._session = session
self._root = Root(session)
self._agent_service = self._root.cortex_agent_service
def _build_request(self, query: str) -> AgentRunRequest:
return AgentRunRequest.from_dict({
"model": "claude-3-5-sonnet",
"tools": [
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "analyst1"}},
{"tool_spec": {"type": "cortex_search", "name": "search1"}},
],
"tool_resources": {
"analyst1": {"semantic_model_file": SEMANTIC_MODEL_FILE},
"search1": {
"name": CORTEX_SEARCH_SERVICE,
"max_results": 10,
"id_column": "conversation_id"
}
},
"messages": [
{"role": "user", "content": [{"type": "text", "text": query}]}
]
})
def _consume_stream(self, stream):
text, sql, citations = "", "", []
for evt in stream.events():
try:
delta = (evt.data.get("delta") if isinstance(evt.data, dict)
else json.loads(evt.data).get("delta")
or json.loads(evt.data).get("data", {}).get("delta"))
except Exception:
continue
if not isinstance(delta, dict):
continue
for item in delta.get("content", []):
if item.get("type") == "text":
text += item.get("text", "")
elif item.get("type") == "tool_results":
for result in item["tool_results"].get("content", []):
if result.get("type") != "json":
continue
j = result["json"]
text += j.get("text", "")
sql = j.get("sql", sql)
citations.extend({
"source_id": s.get("source_id"),
"doc_id": s.get("doc_id")
} for s in j.get("searchResults", []))
return text, sql, str(citations)
def run(self, query: str, **kwargs):
req = self._build_request(query)
stream = self._agent_service.run(req)
text, sql, citations = self._consume_stream(stream)
results_str = ""
if sql:
try:
df = self._session.sql(sql.rstrip(";")).to_pandas()
results_str = df.to_string(index=False)
except Exception as e:
results_str = f"SQL execution error: {e}"
return text, citations, sql, results_str
cortex_agent = CortexAgentTool(session=snowpark_session_trulens)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query") if args[0].get("agent_query") else None,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
query = state.get("agent_query", state.get("user_query", ""))
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_researcher")
# Append to the message history
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": [new_message]},
goto=goto,
)
class CortexAgentArgs(BaseModel):
query: str
class CortexAgentTool(StructuredTool):
name: str = "CortexAgent"
description: str = "answers questions using sales conversations and metrics"
args_schema: Type[CortexAgentArgs] = CortexAgentArgs
# Pydantic-compatible private attributes (not validated or required in __init__)
_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()
def __init__(self, session: Session):
# initialize parent class without passing custom fields
super().__init__()
self._session = session
self._root = Root(session)
self._agent_service = self._root.cortex_agent_service
def _build_request(self, query: str) -> AgentRunRequest:
return AgentRunRequest.from_dict({
"model": "claude-3-5-sonnet",
"tools": [
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "analyst1"}},
{"tool_spec": {"type": "cortex_search", "name": "search1"}},
],
"tool_resources": {
"analyst1": {"semantic_model_file": SEMANTIC_MODEL_FILE},
"search1": {
"name": CORTEX_SEARCH_SERVICE,
"max_results": 10,
"id_column": "conversation_id"
}
},
"messages": [
{"role": "user", "content": [{"type": "text", "text": query}]}
]
})
def _consume_stream(self, stream):
text, sql, citations = "", "", []
for evt in stream.events():
try:
delta = (evt.data.get("delta") if isinstance(evt.data, dict)
else json.loads(evt.data).get("delta")
or json.loads(evt.data).get("data", {}).get("delta"))
except Exception:
continue
if not isinstance(delta, dict):
continue
for item in delta.get("content", []):
if item.get("type") == "text":
text += item.get("text", "")
elif item.get("type") == "tool_results":
for result in item["tool_results"].get("content", []):
if result.get("type") != "json":
continue
j = result["json"]
text += j.get("text", "")
sql = j.get("sql", sql)
citations.extend({
"source_id": s.get("source_id"),
"doc_id": s.get("doc_id")
} for s in j.get("searchResults", []))
return text, sql, str(citations)
def run(self, query: str, **kwargs):
req = self._build_request(query)
stream = self._agent_service.run(req)
text, sql, citations = self._consume_stream(stream)
results_str = ""
if sql:
try:
df = self._session.sql(sql.rstrip(";")).to_pandas()
results_str = df.to_string(index=False)
except Exception as e:
results_str = f"SQL execution error: {e}"
return text, citations, sql, results_str
cortex_agent = CortexAgentTool(session=snowpark_session_trulens)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query") if args[0].get("agent_query") else None,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
query = state.get("agent_query", state.get("user_query", ""))
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_researcher")
# Append to the message history
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": [new_message]},
goto=goto,
)
Create Web Search Agent¶
In [ ]:
Copied!
tavily_tool = TavilySearchResults(max_results=5)
from langchain_openai import ChatOpenAI # <-- not langchain.chat_models
from langchain.tools.tavily_search import TavilySearchResults
llm = ChatOpenAI(model_name="gpt-4o")
tavily_tool = TavilySearchResults(max_results=5)
# Research agent and node
web_search_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(f"""
You are the Researcher. You can ONLY perform research by using the provided search tool (tavily_tool).
When you have found the necessary information, end your output.
Do NOT attempt to take further actions.
"""),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
tavily_tool = TavilySearchResults(max_results=5)
from langchain_openai import ChatOpenAI # <-- not langchain.chat_models
from langchain.tools.tavily_search import TavilySearchResults
llm = ChatOpenAI(model_name="gpt-4o")
tavily_tool = TavilySearchResults(max_results=5)
# Research agent and node
web_search_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(f"""
You are the Researcher. You can ONLY perform research by using the provided search tool (tavily_tool).
When you have found the necessary information, end your output.
Do NOT attempt to take further actions.
"""),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
Create Python REPL Tool¶
In [ ]:
Copied!
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
Create Charting Agent¶
In [ ]:
Copied!
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: State) -> Command[Literal["orchestrator", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: State) -> Command[Literal["orchestrator", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
Create Chart Summary Agent¶
In [ ]:
Copied!
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: State,
) -> Command[Literal[END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], END)
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: State,
) -> Command[Literal[END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], END)
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
Create a Text Summarizer Agent¶
In [ ]:
Copied!
synthesizer_llm = ChatOpenAI(model_name="gpt-4o")
@instrument()
def synthesizer_node(state: State) -> Command:
"""
Creates a concise, human‑readable summary of the entire interaction,
**purely in prose**.
It ignores structured tables or chart IDs and instead rewrites the
relevant agent messages (research results, chart commentary, etc.)
into a short final answer.
"""
# Gather the latest informative messages (customise as you like)
relevant_msgs = [
m.content for m in state.get("messages", [])
if m.name in ("web_researcher", "cortex_researcher", "chart_generator")
][-8:] # keep the last few
summary_prompt = [
HumanMessage(content=
"Summarize the following context into a clear answer for the user:\n\n"
+ "\n\n".join(relevant_msgs))
]
llm_reply = synthesizer_llm.invoke(summary_prompt)
answer = llm_reply.content.strip()
return Command(
update={
"final_answer": answer,
"messages": [HumanMessage(content=answer, name="synthesizer")],
},
goto=END, # hand off to the END node
)
synthesizer_llm = ChatOpenAI(model_name="gpt-4o")
@instrument()
def synthesizer_node(state: State) -> Command:
"""
Creates a concise, human‑readable summary of the entire interaction,
**purely in prose**.
It ignores structured tables or chart IDs and instead rewrites the
relevant agent messages (research results, chart commentary, etc.)
into a short final answer.
"""
# Gather the latest informative messages (customise as you like)
relevant_msgs = [
m.content for m in state.get("messages", [])
if m.name in ("web_researcher", "cortex_researcher", "chart_generator")
][-8:] # keep the last few
summary_prompt = [
HumanMessage(content=
"Summarize the following context into a clear answer for the user:\n\n"
+ "\n\n".join(relevant_msgs))
]
llm_reply = synthesizer_llm.invoke(summary_prompt)
answer = llm_reply.content.strip()
return Command(
update={
"final_answer": answer,
"messages": [HumanMessage(content=answer, name="synthesizer")],
},
goto=END, # hand off to the END node
)
Build the Agent Graph¶
In [ ]:
Copied!
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("planner", planner_node)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_edge(START, "planner")
graph = workflow.compile()
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("planner", planner_node)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_edge(START, "planner")
graph = workflow.compile()
Initialize Evaluations¶
In [ ]:
Copied!
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4o")
gpa_eval_provider = OpenAI(model_engine="gpt-4.1")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on({
"source": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=True
)
}
)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(provider.context_relevance_with_cot_reasons, name="Context Relevance")
.on({
"question": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.QUERY_TEXT,
)
}
)
.on({
"context": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=False
)
}
)
.aggregate(np.mean)
)
# Goal-Plan-Act: Logical consistency of trace
f_logical_consistency = Feedback(
gpa_eval_provider.logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Execution efficiency of trace
f_execution _efficiency = Feedback(
gpa_eval_provider.execution_efficiency_with_cot_reasons,
name="Execution Efficiency",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Plan adherence
f_plan_adherence = Feedback(
gpa_eval_provider.plan_adherence_with_cot_reasons,
name="Plan Adherence",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Plan quality
f_plan_quality = Feedback(
gpa_eval_provider.plan_quality_with_cot_reasons,
name="Plan Quality",
).on({
"trace": Selector(trace_level=True),
})
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4o")
gpa_eval_provider = OpenAI(model_engine="gpt-4.1")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on({
"source": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=True
)
}
)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(provider.context_relevance_with_cot_reasons, name="Context Relevance")
.on({
"question": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.QUERY_TEXT,
)
}
)
.on({
"context": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=False
)
}
)
.aggregate(np.mean)
)
# Goal-Plan-Act: Logical consistency of trace
f_logical_consistency = Feedback(
gpa_eval_provider.logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Execution efficiency of trace
f_execution _efficiency = Feedback(
gpa_eval_provider.execution_efficiency_with_cot_reasons,
name="Execution Efficiency",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Plan adherence
f_plan_adherence = Feedback(
gpa_eval_provider.plan_adherence_with_cot_reasons,
name="Plan Adherence",
).on({
"trace": Selector(trace_level=True),
})
# Goal-Plan-Act: Plan quality
f_plan_quality = Feedback(
gpa_eval_provider.plan_quality_with_cot_reasons,
name="Plan Quality",
).on({
"trace": Selector(trace_level=True),
})
Create TruLens Session and Register Agent¶
In [ ]:
Copied!
from trulens.core import TruSession
session = TruSession()
session.reset_database()
from trulens.core import TruSession
session = TruSession()
session.reset_database()
In [ ]:
Copied!
from trulens.apps.langgraph import TruGraph
tru_recorder = TruGraph(
graph,
app_name="Sales Data Agent",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_logical_consistency,
f_execution_efficiency,
f_plan_adherence,
f_plan_quality,
],
)
from trulens.apps.langgraph import TruGraph
tru_recorder = TruGraph(
graph,
app_name="Sales Data Agent",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_logical_consistency,
f_execution_efficiency,
f_plan_adherence,
f_plan_quality,
],
)
Record Agent Usage¶
In [ ]:
Copied!
user_queries = [
"What are our top 3 client deals? Chart the deal value for each",
"What were the key concerns from the call with Healhtech? Is there any recent news that may be driving these concerns? If so, summarize the related news",
]
user_queries = [
"What are our top 3 client deals? Chart the deal value for each",
"What were the key concerns from the call with Healhtech? Is there any recent news that may be driving these concerns? If so, summarize the related news",
]
In [ ]:
Copied!
from langchain_core.runnables import RunnableConfig
thread_config = RunnableConfig(
recursion_limit=150,
configurable={"thread_id": f"run-{uuid.uuid4().hex}"}
)
base_state = {
"current_step": 0,
"plan": [],
"replan_flag": False,
"replan_attempts": {},
}
with tru_recorder as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
graph.invoke(
{
**base_state,
"messages": [HumanMessage(content=query)],
"user_query": query,
},
config=thread_config
)
from langchain_core.runnables import RunnableConfig
thread_config = RunnableConfig(
recursion_limit=150,
configurable={"thread_id": f"run-{uuid.uuid4().hex}"}
)
base_state = {
"current_step": 0,
"plan": [],
"replan_flag": False,
"replan_attempts": {},
}
with tru_recorder as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
graph.invoke(
{
**base_state,
"messages": [HumanMessage(content=query)],
"user_query": query,
},
config=thread_config
)
Launch the TruLens Dashboard¶
In [ ]:
Copied!
from trulens.dashboard import run_dashboard
run_dashboard()
from trulens.dashboard import run_dashboard
run_dashboard()
Add In-line Evaluations¶
In [ ]:
Copied!
from trulens.apps.langgraph.inline_evaluations import inline_evaluation
# eval for in-line evaluation
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance",
criteria = "Context is fully relevant if it includes all of the information needed to answer the question, regardless of whether any visualizations or charts are included in the context."
)
.on({
"question": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.QUERY_TEXT,
)
}
)
.on({
"context": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=False
)
}
)
.aggregate(np.mean)
)
from trulens.apps.langgraph.inline_evaluations import inline_evaluation
# eval for in-line evaluation
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance",
criteria = "Context is fully relevant if it includes all of the information needed to answer the question, regardless of whether any visualizations or charts are included in the context."
)
.on({
"question": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.QUERY_TEXT,
)
}
)
.on({
"context": Selector(
span_type=SpanAttributes.SpanType.RETRIEVAL,
span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS,
collect_list=False
)
}
)
.aggregate(np.mean)
)
Add in-line evaluation to web search and cortex agent nodes¶
In [ ]:
Copied!
@inline_evaluation(f_context_relevance)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
@inline_evaluation(f_context_relevance)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
query = state.get("agent_query", state.get("user_query", ""))
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_researcher")
# Append to the message history
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": [new_message]},
goto=goto,
)
@inline_evaluation(f_context_relevance)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
@inline_evaluation(f_context_relevance)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: State,
) -> Command[Literal["orchestrator", END]]:
query = state.get("agent_query", state.get("user_query", ""))
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_researcher")
# Append to the message history
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": [new_message]},
goto=goto,
)
Recompile graph¶
In [ ]:
Copied!
workflow = StateGraph(MessagesState)
workflow.add_node("planner", planner_node)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_edge(START, "planner")
graph_v2 = workflow.compile()
workflow = StateGraph(MessagesState)
workflow.add_node("planner", planner_node)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_edge(START, "planner")
graph_v2 = workflow.compile()
Register new version of the agent¶
In [ ]:
Copied!
tru_recorder_v2 = TruGraph(
graph_v2,
app_name="Sales Data Agent",
app_version="In-line Evaluation",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_logical_consistency,
f_execution_efficiency,
f_plan_adherence,
f_plan_quality,
],
)
tru_recorder_v2 = TruGraph(
graph_v2,
app_name="Sales Data Agent",
app_version="In-line Evaluation",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_logical_consistency,
f_execution_efficiency,
f_plan_adherence,
f_plan_quality,
],
)
In [ ]:
Copied!
with tru_recorder_v2 as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
graph_v2.invoke(
{
**base_state,
"messages": [HumanMessage(content=query)],
"user_query": query,
},
config=thread_config
)
with tru_recorder_v2 as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
graph_v2.invoke(
{
**base_state,
"messages": [HumanMessage(content=query)],
"user_query": query,
},
config=thread_config
)