Multi-agent Data Tasks¶
Build a sales data agent that can operate across private and public data, perform research, answer questions, and generate charts.
In [ ]:
Copied!
%%capture --no-stderr
# pip install -U langchain_community langchain_openai langchain_experimental langchain_community matplotlib langgraph google-search-results snowflake.core trulens-core trulens-connectors-snowflake trulens-providers-openai matplotlib trulens-apps-langgraph
%%capture --no-stderr
# pip install -U langchain_community langchain_openai langchain_experimental langchain_community matplotlib langgraph google-search-results snowflake.core trulens-core trulens-connectors-snowflake trulens-providers-openai matplotlib trulens-apps-langgraph
Choose an app name¶
In [ ]:
Copied!
APP_NAME = "Sales Data Agent" # set this app name for your use case
APP_NAME = "Sales Data Agent" # set this app name for your use case
Set the resources for Cortex Agent¶
In [ ]:
Copied!
SEMANTIC_MODEL_FILE = "@sales_intelligence.data.models/sales_metrics_model.yaml"
CORTEX_SEARCH_SERVICE = "sales_intelligence.data.sales_conversation_search"
SEMANTIC_MODEL_FILE = "@sales_intelligence.data.models/sales_metrics_model.yaml"
CORTEX_SEARCH_SERVICE = "sales_intelligence.data.sales_conversation_search"
Set Keys¶
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TAVILY_API_KEY"] = "tvly-dev-..."
# ai observablity
os.environ["SNOWFLAKE_ACCOUNT"] = "..."
os.environ["SNOWFLAKE_USER"] = "..."
os.environ["SNOWFLAKE_USER_PASSWORD"] = "..."
os.environ["SNOWFLAKE_DATABASE"] = "SALES_INTELLIGENCE"
os.environ["SNOWFLAKE_SCHEMA"] = "DATA"
os.environ["SNOWFLAKE_ROLE"] = "ACCOUNTADMIN"
os.environ["SNOWFLAKE_WAREHOUSE"] = "COMPUTE_WH"
os.environ["SNOWFLAKE_PAT"] = "..." # cortex agent call
os.environ["TRULENS_OTEL_TRACING"] = (
"1" # to enable OTEL tracing
)
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TAVILY_API_KEY"] = "tvly-dev-..."
# ai observablity
os.environ["SNOWFLAKE_ACCOUNT"] = "..."
os.environ["SNOWFLAKE_USER"] = "..."
os.environ["SNOWFLAKE_USER_PASSWORD"] = "..."
os.environ["SNOWFLAKE_DATABASE"] = "SALES_INTELLIGENCE"
os.environ["SNOWFLAKE_SCHEMA"] = "DATA"
os.environ["SNOWFLAKE_ROLE"] = "ACCOUNTADMIN"
os.environ["SNOWFLAKE_WAREHOUSE"] = "COMPUTE_WH"
os.environ["SNOWFLAKE_PAT"] = "..." # cortex agent call
os.environ["TRULENS_OTEL_TRACING"] = (
"1" # to enable OTEL tracing
)
Run the following setup SQL in your Snowflake account to create resources¶
In [ ]:
Copied!
-- Create database and schema
CREATE OR REPLACE DATABASE sales_intelligence;
CREATE OR REPLACE SCHEMA sales_intelligence.data;
CREATE OR REPLACE WAREHOUSE sales_intelligence_wh;
USE DATABASE sales_intelligence;
USE SCHEMA data;
-- Create tables for sales data
CREATE TABLE sales_conversations (
conversation_id VARCHAR,
transcript_text TEXT,
customer_name VARCHAR,
deal_stage VARCHAR,
sales_rep VARCHAR,
conversation_date TIMESTAMP,
deal_value FLOAT,
product_line VARCHAR
);
CREATE TABLE sales_metrics (
deal_id VARCHAR,
customer_name VARCHAR,
deal_value FLOAT,
close_date DATE,
sales_stage VARCHAR,
win_status BOOLEAN,
sales_rep VARCHAR,
product_line VARCHAR
);
-- First, let's insert data into sales_conversations
INSERT INTO sales_conversations
(conversation_id, transcript_text, customer_name, deal_stage, sales_rep, conversation_date, deal_value, product_line)
VALUES
('CONV001', 'Initial discovery call with TechCorp Inc''s IT Director and Solutions Architect. Client showed strong interest in our enterprise solution features, particularly the automated workflow capabilities. The main discussion centered around integration timeline and complexity. They currently use Legacy System X for their core operations and expressed concerns about potential disruption during migration. The team asked detailed questions about API compatibility and data migration tools.
Action items include providing a detailed integration timeline document, scheduling a technical deep-dive with their infrastructure team, and sharing case studies of similar Legacy System X migrations. The client mentioned a Q2 budget allocation for digital transformation initiatives. Overall, it was a positive engagement with clear next steps.', 'TechCorp Inc', 'Discovery', 'Sarah Johnson', '2024-01-15 10:30:00', 75000, 'Enterprise Suite'),
('CONV002', 'Follow-up call with SmallBiz Solutions'' Operations Manager and Finance Director. The primary focus was on pricing structure and ROI timeline. They compared our Basic Package pricing with Competitor Y''s small business offering. Key discussion points included monthly vs. annual billing options, user license limitations, and potential cost savings from process automation.
The client requested a detailed ROI analysis focusing on time saved in daily operations, resource allocation improvements, and projected efficiency gains. Budget constraints were clearly communicated, with a maximum budget of $30K for this year. They showed interest in starting with the basic package with room for a potential upgrade in Q4. Next steps include providing a competitive analysis and a customized ROI calculator by next week.', 'SmallBiz Solutions', 'Negotiation', 'Mike Chen', '2024-01-16 14:45:00', 25000, 'Basic Package'),
('CONV003', 'Strategy session with SecureBank Ltd''s CISO and Security Operations team. Extremely positive 90-minute deep dive into our Premium Security package. Customer emphasized immediate need for implementation due to recent industry compliance updates. Our advanced security features, especially multi-factor authentication and encryption protocols, were identified as perfect fits for their requirements. Technical team was particularly impressed with our zero-trust architecture approach and real-time threat monitoring capabilities. They''ve already secured budget approval and have executive buy-in. Compliance documentation is ready for review. Action items include: finalizing implementation timeline, scheduling security audit, and preparing necessary documentation for their risk assessment team. Client ready to move forward with contract discussions.', 'SecureBank Ltd', 'Closing', 'Rachel Torres', '2024-01-17 11:20:00', 150000, 'Premium Security'),
('CONV004', 'Comprehensive discovery call with GrowthStart Up''s CTO and Department Heads. Team of 500+ employees across 3 continents discussed current challenges with their existing solution. Major pain points identified: system crashes during peak usage, limited cross-department reporting capabilities, and poor scalability for remote teams. Deep dive into their current workflow revealed bottlenecks in data sharing and collaboration. Technical requirements gathered for each department. Platform demo focused on scalability features and global team management capabilities. Client particularly interested in our API ecosystem and custom reporting engine. Next steps: schedule department-specific workflow analysis and prepare detailed platform migration plan.', 'GrowthStart Up', 'Discovery', 'Sarah Johnson', '2024-01-18 09:15:00', 100000, 'Enterprise Suite'),
('CONV005', 'In-depth demo session with DataDriven Co''s Analytics team and Business Intelligence managers. Showcase focused on advanced analytics capabilities, custom dashboard creation, and real-time data processing features. Team was particularly impressed with our machine learning integration and predictive analytics models. Competitor comparison requested specifically against Market Leader Z and Innovative Start-up X. Price point falls within their allocated budget range, but team expressed interest in multi-year commitment with corresponding discount structure. Technical questions centered around data warehouse integration and custom visualization capabilities. Action items: prepare detailed competitor feature comparison matrix and draft multi-year pricing proposals with various discount scenarios.', 'DataDriven Co', 'Demo', 'James Wilson', '2024-01-19 13:30:00', 85000, 'Analytics Pro'),
('CONV006', 'Extended technical deep dive with HealthTech Solutions'' IT Security team, Compliance Officer, and System Architects. Four-hour session focused on API infrastructure, data security protocols, and compliance requirements. Team raised specific concerns about HIPAA compliance, data encryption standards, and API rate limiting. Detailed discussion of our security architecture, including: end-to-end encryption, audit logging, and disaster recovery protocols. Client requires extensive documentation on compliance certifications, particularly SOC 2 and HITRUST. Security team performed initial architecture review and requested additional information about: database segregation, backup procedures, and incident response protocols. Follow-up session scheduled with their compliance team next week.', 'HealthTech Solutions', 'Technical Review', 'Rachel Torres', '2024-01-20 15:45:00', 120000, 'Premium Security'),
('CONV007', 'Contract review meeting with LegalEase Corp''s General Counsel, Procurement Director, and IT Manager. Detailed analysis of SLA terms, focusing on uptime guarantees and support response times. Legal team requested specific modifications to liability clauses and data handling agreements. Procurement raised questions about payment terms and service credit structure. Key discussion points included: disaster recovery commitments, data retention policies, and exit clause specifications. IT Manager confirmed technical requirements are met pending final security assessment. Agreement reached on most terms, with only SLA modifications remaining for discussion. Legal team to provide revised contract language by end of week. Overall positive session with clear path to closing.', 'LegalEase Corp', 'Negotiation', 'Mike Chen', '2024-01-21 10:00:00', 95000, 'Enterprise Suite'),
('CONV008', 'Quarterly business review with GlobalTrade Inc''s current implementation team and potential expansion stakeholders. Current implementation in Finance department showcasing strong adoption rates and 40% improvement in processing times. Discussion focused on expanding solution to Operations and HR departments. Users highlighted positive experiences with customer support and platform stability. Challenges identified in current usage: need for additional custom reports and increased automation in workflow processes. Expansion requirements gathered from Operations Director: inventory management integration, supplier portal access, and enhanced tracking capabilities. HR team interested in recruitment and onboarding workflow automation. Next steps: prepare department-specific implementation plans and ROI analysis for expansion.', 'GlobalTrade Inc', 'Expansion', 'James Wilson', '2024-01-22 14:20:00', 45000, 'Basic Package'),
('CONV009', 'Emergency planning session with FastTrack Ltd''s Executive team and Project Managers. Critical need for rapid implementation due to current system failure. Team willing to pay premium for expedited deployment and dedicated support team. Detailed discussion of accelerated implementation timeline and resource requirements. Key requirements: minimal disruption to operations, phased data migration, and emergency support protocols. Technical team confident in meeting aggressive timeline with additional resources. Executive sponsor emphasized importance of going live within 30 days. Immediate next steps: finalize expedited implementation plan, assign dedicated support team, and begin emergency onboarding procedures. Team to reconvene daily for progress updates.', 'FastTrack Ltd', 'Closing', 'Sarah Johnson', '2024-01-23 16:30:00', 180000, 'Premium Security'),
('CONV010', 'Quarterly strategic review with UpgradeNow Corp''s Department Heads and Analytics team. Current implementation meeting basic needs but team requiring more sophisticated analytics capabilities. Deep dive into current usage patterns revealed opportunities for workflow optimization and advanced reporting needs. Users expressed strong satisfaction with platform stability and basic features, but requiring enhanced data visualization and predictive analytics capabilities. Analytics team presented specific requirements: custom dashboard creation, advanced data modeling tools, and integrated BI features. Discussion about upgrade path from current package to Analytics Pro tier. ROI analysis presented showing potential 60% improvement in reporting efficiency. Team to present upgrade proposal to executive committee next month.', 'UpgradeNow Corp', 'Expansion', 'Rachel Torres', '2024-01-24 11:45:00', 65000, 'Analytics Pro');
-- Now, let's insert corresponding data into sales_metrics
INSERT INTO sales_metrics
(deal_id, customer_name, deal_value, close_date, sales_stage, win_status, sales_rep, product_line)
VALUES
('DEAL001', 'TechCorp Inc', 75000, '2024-02-15', 'Closed', true, 'Sarah Johnson', 'Enterprise Suite'),
('DEAL002', 'SmallBiz Solutions', 25000, '2024-02-01', 'Lost', false, 'Mike Chen', 'Basic Package'),
('DEAL003', 'SecureBank Ltd', 150000, '2024-01-30', 'Closed', true, 'Rachel Torres', 'Premium Security'),
('DEAL004', 'GrowthStart Up', 100000, '2024-02-10', 'Pending', false, 'Sarah Johnson', 'Enterprise Suite'),
('DEAL005', 'DataDriven Co', 85000, '2024-02-05', 'Closed', true, 'James Wilson', 'Analytics Pro'),
('DEAL006', 'HealthTech Solutions', 120000, '2024-02-20', 'Pending', false, 'Rachel Torres', 'Premium Security'),
('DEAL007', 'LegalEase Corp', 95000, '2024-01-25', 'Closed', true, 'Mike Chen', 'Enterprise Suite'),
('DEAL008', 'GlobalTrade Inc', 45000, '2024-02-08', 'Closed', true, 'James Wilson', 'Basic Package'),
('DEAL009', 'FastTrack Ltd', 180000, '2024-02-12', 'Closed', true, 'Sarah Johnson', 'Premium Security'),
('DEAL010', 'UpgradeNow Corp', 65000, '2024-02-18', 'Pending', false, 'Rachel Torres', 'Analytics Pro');
-- Enable change tracking
ALTER TABLE sales_conversations SET CHANGE_TRACKING = TRUE;
-- Create the search service
CREATE OR REPLACE CORTEX SEARCH SERVICE sales_conversation_search
ON transcript_text
ATTRIBUTES customer_name, deal_stage, sales_rep, product_line, conversation_date, deal_value
WAREHOUSE = sales_intelligence_wh
TARGET_LAG = '1 minute'
AS (
SELECT
conversation_id,
transcript_text,
customer_name,
deal_stage,
sales_rep,
conversation_date,
deal_value,
product_line
FROM sales_conversations
WHERE conversation_date >= '2024-01-01' -- Fixed date instead of CURRENT_TIMESTAMP
);
CREATE OR REPLACE STAGE models
DIRECTORY = (ENABLE = TRUE);
-- Create database and schema
CREATE OR REPLACE DATABASE sales_intelligence;
CREATE OR REPLACE SCHEMA sales_intelligence.data;
CREATE OR REPLACE WAREHOUSE sales_intelligence_wh;
USE DATABASE sales_intelligence;
USE SCHEMA data;
-- Create tables for sales data
CREATE TABLE sales_conversations (
conversation_id VARCHAR,
transcript_text TEXT,
customer_name VARCHAR,
deal_stage VARCHAR,
sales_rep VARCHAR,
conversation_date TIMESTAMP,
deal_value FLOAT,
product_line VARCHAR
);
CREATE TABLE sales_metrics (
deal_id VARCHAR,
customer_name VARCHAR,
deal_value FLOAT,
close_date DATE,
sales_stage VARCHAR,
win_status BOOLEAN,
sales_rep VARCHAR,
product_line VARCHAR
);
-- First, let's insert data into sales_conversations
INSERT INTO sales_conversations
(conversation_id, transcript_text, customer_name, deal_stage, sales_rep, conversation_date, deal_value, product_line)
VALUES
('CONV001', 'Initial discovery call with TechCorp Inc''s IT Director and Solutions Architect. Client showed strong interest in our enterprise solution features, particularly the automated workflow capabilities. The main discussion centered around integration timeline and complexity. They currently use Legacy System X for their core operations and expressed concerns about potential disruption during migration. The team asked detailed questions about API compatibility and data migration tools.
Action items include providing a detailed integration timeline document, scheduling a technical deep-dive with their infrastructure team, and sharing case studies of similar Legacy System X migrations. The client mentioned a Q2 budget allocation for digital transformation initiatives. Overall, it was a positive engagement with clear next steps.', 'TechCorp Inc', 'Discovery', 'Sarah Johnson', '2024-01-15 10:30:00', 75000, 'Enterprise Suite'),
('CONV002', 'Follow-up call with SmallBiz Solutions'' Operations Manager and Finance Director. The primary focus was on pricing structure and ROI timeline. They compared our Basic Package pricing with Competitor Y''s small business offering. Key discussion points included monthly vs. annual billing options, user license limitations, and potential cost savings from process automation.
The client requested a detailed ROI analysis focusing on time saved in daily operations, resource allocation improvements, and projected efficiency gains. Budget constraints were clearly communicated, with a maximum budget of $30K for this year. They showed interest in starting with the basic package with room for a potential upgrade in Q4. Next steps include providing a competitive analysis and a customized ROI calculator by next week.', 'SmallBiz Solutions', 'Negotiation', 'Mike Chen', '2024-01-16 14:45:00', 25000, 'Basic Package'),
('CONV003', 'Strategy session with SecureBank Ltd''s CISO and Security Operations team. Extremely positive 90-minute deep dive into our Premium Security package. Customer emphasized immediate need for implementation due to recent industry compliance updates. Our advanced security features, especially multi-factor authentication and encryption protocols, were identified as perfect fits for their requirements. Technical team was particularly impressed with our zero-trust architecture approach and real-time threat monitoring capabilities. They''ve already secured budget approval and have executive buy-in. Compliance documentation is ready for review. Action items include: finalizing implementation timeline, scheduling security audit, and preparing necessary documentation for their risk assessment team. Client ready to move forward with contract discussions.', 'SecureBank Ltd', 'Closing', 'Rachel Torres', '2024-01-17 11:20:00', 150000, 'Premium Security'),
('CONV004', 'Comprehensive discovery call with GrowthStart Up''s CTO and Department Heads. Team of 500+ employees across 3 continents discussed current challenges with their existing solution. Major pain points identified: system crashes during peak usage, limited cross-department reporting capabilities, and poor scalability for remote teams. Deep dive into their current workflow revealed bottlenecks in data sharing and collaboration. Technical requirements gathered for each department. Platform demo focused on scalability features and global team management capabilities. Client particularly interested in our API ecosystem and custom reporting engine. Next steps: schedule department-specific workflow analysis and prepare detailed platform migration plan.', 'GrowthStart Up', 'Discovery', 'Sarah Johnson', '2024-01-18 09:15:00', 100000, 'Enterprise Suite'),
('CONV005', 'In-depth demo session with DataDriven Co''s Analytics team and Business Intelligence managers. Showcase focused on advanced analytics capabilities, custom dashboard creation, and real-time data processing features. Team was particularly impressed with our machine learning integration and predictive analytics models. Competitor comparison requested specifically against Market Leader Z and Innovative Start-up X. Price point falls within their allocated budget range, but team expressed interest in multi-year commitment with corresponding discount structure. Technical questions centered around data warehouse integration and custom visualization capabilities. Action items: prepare detailed competitor feature comparison matrix and draft multi-year pricing proposals with various discount scenarios.', 'DataDriven Co', 'Demo', 'James Wilson', '2024-01-19 13:30:00', 85000, 'Analytics Pro'),
('CONV006', 'Extended technical deep dive with HealthTech Solutions'' IT Security team, Compliance Officer, and System Architects. Four-hour session focused on API infrastructure, data security protocols, and compliance requirements. Team raised specific concerns about HIPAA compliance, data encryption standards, and API rate limiting. Detailed discussion of our security architecture, including: end-to-end encryption, audit logging, and disaster recovery protocols. Client requires extensive documentation on compliance certifications, particularly SOC 2 and HITRUST. Security team performed initial architecture review and requested additional information about: database segregation, backup procedures, and incident response protocols. Follow-up session scheduled with their compliance team next week.', 'HealthTech Solutions', 'Technical Review', 'Rachel Torres', '2024-01-20 15:45:00', 120000, 'Premium Security'),
('CONV007', 'Contract review meeting with LegalEase Corp''s General Counsel, Procurement Director, and IT Manager. Detailed analysis of SLA terms, focusing on uptime guarantees and support response times. Legal team requested specific modifications to liability clauses and data handling agreements. Procurement raised questions about payment terms and service credit structure. Key discussion points included: disaster recovery commitments, data retention policies, and exit clause specifications. IT Manager confirmed technical requirements are met pending final security assessment. Agreement reached on most terms, with only SLA modifications remaining for discussion. Legal team to provide revised contract language by end of week. Overall positive session with clear path to closing.', 'LegalEase Corp', 'Negotiation', 'Mike Chen', '2024-01-21 10:00:00', 95000, 'Enterprise Suite'),
('CONV008', 'Quarterly business review with GlobalTrade Inc''s current implementation team and potential expansion stakeholders. Current implementation in Finance department showcasing strong adoption rates and 40% improvement in processing times. Discussion focused on expanding solution to Operations and HR departments. Users highlighted positive experiences with customer support and platform stability. Challenges identified in current usage: need for additional custom reports and increased automation in workflow processes. Expansion requirements gathered from Operations Director: inventory management integration, supplier portal access, and enhanced tracking capabilities. HR team interested in recruitment and onboarding workflow automation. Next steps: prepare department-specific implementation plans and ROI analysis for expansion.', 'GlobalTrade Inc', 'Expansion', 'James Wilson', '2024-01-22 14:20:00', 45000, 'Basic Package'),
('CONV009', 'Emergency planning session with FastTrack Ltd''s Executive team and Project Managers. Critical need for rapid implementation due to current system failure. Team willing to pay premium for expedited deployment and dedicated support team. Detailed discussion of accelerated implementation timeline and resource requirements. Key requirements: minimal disruption to operations, phased data migration, and emergency support protocols. Technical team confident in meeting aggressive timeline with additional resources. Executive sponsor emphasized importance of going live within 30 days. Immediate next steps: finalize expedited implementation plan, assign dedicated support team, and begin emergency onboarding procedures. Team to reconvene daily for progress updates.', 'FastTrack Ltd', 'Closing', 'Sarah Johnson', '2024-01-23 16:30:00', 180000, 'Premium Security'),
('CONV010', 'Quarterly strategic review with UpgradeNow Corp''s Department Heads and Analytics team. Current implementation meeting basic needs but team requiring more sophisticated analytics capabilities. Deep dive into current usage patterns revealed opportunities for workflow optimization and advanced reporting needs. Users expressed strong satisfaction with platform stability and basic features, but requiring enhanced data visualization and predictive analytics capabilities. Analytics team presented specific requirements: custom dashboard creation, advanced data modeling tools, and integrated BI features. Discussion about upgrade path from current package to Analytics Pro tier. ROI analysis presented showing potential 60% improvement in reporting efficiency. Team to present upgrade proposal to executive committee next month.', 'UpgradeNow Corp', 'Expansion', 'Rachel Torres', '2024-01-24 11:45:00', 65000, 'Analytics Pro');
-- Now, let's insert corresponding data into sales_metrics
INSERT INTO sales_metrics
(deal_id, customer_name, deal_value, close_date, sales_stage, win_status, sales_rep, product_line)
VALUES
('DEAL001', 'TechCorp Inc', 75000, '2024-02-15', 'Closed', true, 'Sarah Johnson', 'Enterprise Suite'),
('DEAL002', 'SmallBiz Solutions', 25000, '2024-02-01', 'Lost', false, 'Mike Chen', 'Basic Package'),
('DEAL003', 'SecureBank Ltd', 150000, '2024-01-30', 'Closed', true, 'Rachel Torres', 'Premium Security'),
('DEAL004', 'GrowthStart Up', 100000, '2024-02-10', 'Pending', false, 'Sarah Johnson', 'Enterprise Suite'),
('DEAL005', 'DataDriven Co', 85000, '2024-02-05', 'Closed', true, 'James Wilson', 'Analytics Pro'),
('DEAL006', 'HealthTech Solutions', 120000, '2024-02-20', 'Pending', false, 'Rachel Torres', 'Premium Security'),
('DEAL007', 'LegalEase Corp', 95000, '2024-01-25', 'Closed', true, 'Mike Chen', 'Enterprise Suite'),
('DEAL008', 'GlobalTrade Inc', 45000, '2024-02-08', 'Closed', true, 'James Wilson', 'Basic Package'),
('DEAL009', 'FastTrack Ltd', 180000, '2024-02-12', 'Closed', true, 'Sarah Johnson', 'Premium Security'),
('DEAL010', 'UpgradeNow Corp', 65000, '2024-02-18', 'Pending', false, 'Rachel Torres', 'Analytics Pro');
-- Enable change tracking
ALTER TABLE sales_conversations SET CHANGE_TRACKING = TRUE;
-- Create the search service
CREATE OR REPLACE CORTEX SEARCH SERVICE sales_conversation_search
ON transcript_text
ATTRIBUTES customer_name, deal_stage, sales_rep, product_line, conversation_date, deal_value
WAREHOUSE = sales_intelligence_wh
TARGET_LAG = '1 minute'
AS (
SELECT
conversation_id,
transcript_text,
customer_name,
deal_stage,
sales_rep,
conversation_date,
deal_value,
product_line
FROM sales_conversations
WHERE conversation_date >= '2024-01-01' -- Fixed date instead of CURRENT_TIMESTAMP
);
CREATE OR REPLACE STAGE models
DIRECTORY = (ENABLE = TRUE);
Upload the semantic model:¶
Download sales_metrics_model.yaml(NOTE: Do NOT right-click to download.) Navigate to Data » Databases » SALES_INTELLIGENCE » DATA » Stages » MODELS Click "+ Files" in the top right Browse and select sales_metrics_model.yaml file Click "Upload"
Create Network Policy and PAT¶
In [ ]:
Copied!
/*------------------------------------------------------------*/
/* 1) As ACCOUNTADMIN: create a place to store the rule */
/*------------------------------------------------------------*/
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE DATABASE securitydb;
CREATE OR REPLACE SCHEMA securitydb.myrules;
/* Give SECURITYADMIN everything it needs in that schema */
GRANT USAGE ON DATABASE securitydb TO ROLE SECURITYADMIN;
GRANT USAGE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
GRANT CREATE NETWORK RULE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
/*------------------------------------------------------------*/
/* 2) Switch to SECURITYADMIN and create the rule */
/*------------------------------------------------------------*/
USE ROLE SECURITYADMIN;
USE DATABASE securitydb;
USE SCHEMA myrules;
CREATE OR REPLACE NETWORK RULE allow_all_ingress
MODE = INGRESS -- protects service (and internal stage if enabled)
TYPE = IPV4
VALUE_LIST = ('0.0.0.0/0') -- “everything”
COMMENT = 'Allows all IPv4 traffic (public + private)';
/*------------------------------------------------------------*/
/* 4) Create network policy with the rule */
/*------------------------------------------------------------*/
CREATE OR REPLACE NETWORK POLICY open_access_policy
ALLOWED_NETWORK_RULE_LIST = ('allow_all_ingress')
COMMENT = 'Permits all IPv4 traffic – used for PAT testing';
/*------------------------------------------------------------*/
/* 3) Attach the policy to the account */
/*------------------------------------------------------------*/
ALTER ACCOUNT SET NETWORK_POLICY = open_access_policy;
ALTER USER ADD PROGRAMMATIC ACCESS TOKEN agents_token;
/*------------------------------------------------------------*/
/* 1) As ACCOUNTADMIN: create a place to store the rule */
/*------------------------------------------------------------*/
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE DATABASE securitydb;
CREATE OR REPLACE SCHEMA securitydb.myrules;
/* Give SECURITYADMIN everything it needs in that schema */
GRANT USAGE ON DATABASE securitydb TO ROLE SECURITYADMIN;
GRANT USAGE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
GRANT CREATE NETWORK RULE ON SCHEMA securitydb.myrules TO ROLE SECURITYADMIN;
/*------------------------------------------------------------*/
/* 2) Switch to SECURITYADMIN and create the rule */
/*------------------------------------------------------------*/
USE ROLE SECURITYADMIN;
USE DATABASE securitydb;
USE SCHEMA myrules;
CREATE OR REPLACE NETWORK RULE allow_all_ingress
MODE = INGRESS -- protects service (and internal stage if enabled)
TYPE = IPV4
VALUE_LIST = ('0.0.0.0/0') -- “everything”
COMMENT = 'Allows all IPv4 traffic (public + private)';
/*------------------------------------------------------------*/
/* 4) Create network policy with the rule */
/*------------------------------------------------------------*/
CREATE OR REPLACE NETWORK POLICY open_access_policy
ALLOWED_NETWORK_RULE_LIST = ('allow_all_ingress')
COMMENT = 'Permits all IPv4 traffic – used for PAT testing';
/*------------------------------------------------------------*/
/* 3) Attach the policy to the account */
/*------------------------------------------------------------*/
ALTER ACCOUNT SET NETWORK_POLICY = open_access_policy;
ALTER USER ADD PROGRAMMATIC ACCESS TOKEN agents_token;
Import libraries¶
In [ ]:
Copied!
import ast
import datetime
import json
import os
import time
from typing import List, Literal
import uuid
import requests
from typing import Type
from typing import Annotated
from typing import Any
import pandas as pd
from snowflake.snowpark import Session
from pydantic import BaseModel, PrivateAttr
from snowflake.core import Root
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from snowflake.snowpark import Session
from langchain.load.dump import dumps
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
from langchain_core.messages import AIMessage
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.messages import SystemMessage
from langchain_core.messages import ToolMessage
from langchain_core.tools import StructuredTool
from langchain_core.tools import Tool
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_experimental.utilities import PythonREPL
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from pydantic import BaseModel
from snowflake.snowpark import Session
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.otel.instrument import instrument
from trulens.core.run import Run
from trulens.core.run import RunConfig
from trulens.otel.semconv.trace import BASE_SCOPE
from trulens.otel.semconv.trace import SpanAttributes
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
import ast
import datetime
import json
import os
import time
from typing import List, Literal
import uuid
import requests
from typing import Type
from typing import Annotated
from typing import Any
import pandas as pd
from snowflake.snowpark import Session
from pydantic import BaseModel, PrivateAttr
from snowflake.core import Root
from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from snowflake.snowpark import Session
from langchain.load.dump import dumps
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
from langchain_core.messages import AIMessage
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.messages import SystemMessage
from langchain_core.messages import ToolMessage
from langchain_core.tools import StructuredTool
from langchain_core.tools import Tool
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_experimental.utilities import PythonREPL
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from pydantic import BaseModel
from snowflake.snowpark import Session
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.otel.instrument import instrument
from trulens.core.run import Run
from trulens.core.run import RunConfig
from trulens.otel.semconv.trace import BASE_SCOPE
from trulens.otel.semconv.trace import SpanAttributes
from langchain.load.dump import dumps
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent
from trulens.core.otel.instrument import instrument
Create TruLens/Snowflake Connection¶
In [ ]:
Copied!
# Snowflake account for trulens
snowflake_connection_parameters = {
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_USER_PASSWORD"],
"database": os.environ["SNOWFLAKE_DATABASE"],
"schema": os.environ["SNOWFLAKE_SCHEMA"],
"role": os.environ["SNOWFLAKE_ROLE"],
"warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
}
snowpark_session_trulens = Session.builder.configs(
snowflake_connection_parameters
).create()
trulens_sf_connector = SnowflakeConnector(
snowpark_session=snowpark_session_trulens
)
# Snowflake account for trulens
snowflake_connection_parameters = {
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_USER_PASSWORD"],
"database": os.environ["SNOWFLAKE_DATABASE"],
"schema": os.environ["SNOWFLAKE_SCHEMA"],
"role": os.environ["SNOWFLAKE_ROLE"],
"warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
}
snowpark_session_trulens = Session.builder.configs(
snowflake_connection_parameters
).create()
trulens_sf_connector = SnowflakeConnector(
snowpark_session=snowpark_session_trulens
)
Create Orchestrator¶
In [ ]:
Copied!
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o")
@instrument()
def orchestration_node(state: MessagesState) -> Command:
# Find the most recent user query
user_query = None
user_index = None
for i, msg in reversed(list(enumerate(state["messages"]))):
if isinstance(msg, HumanMessage) and getattr(msg, "name", None) is None:
user_query = msg.content
user_index = i
break
if user_query is None:
user_query = state["messages"][0].content
user_index = 0
# Find the most recent research answer after the user query
research_answer = None
for msg in reversed(state["messages"][user_index+1:]):
if getattr(msg, "name", None) in ("web_researcher", "cortex_agents_researcher"):
research_answer = msg.content
break
# Compose prompt for the LLM
prompt = (
"You are an orchestration agent for a multi-agent data assistant.\n"
"Given the user query and the most recent research answer, decide if the research answer contains all the information needed to answer the query, or if more research is needed.\n"
"- Respond 'done' if the research answer is sufficient to answer the query and no chart is requested or if a chart has been generated.\n"
"- Respond 'web' if more public data research is needed.\n"
"- Respond 'cortex' if more private/company data research is needed.\n"
"- Respond 'chart' if a chart is requested and the research answer is sufficient to generate a chart.\n"
"\n"
"User query:\n"
f"{user_query}\n"
"\n"
"Most recent research answer:\n"
f"{research_answer if research_answer else '[No research answer yet]'}\n"
"\n"
"Respond with only one word: 'cortex', 'web', or 'none'."
)
decision = llm.invoke(prompt).content.strip().lower()
# Route accordingly
if "cortex" in decision:
return Command(update={}, goto="cortex_agent_researcher")
elif "web" in decision:
return Command(update={}, goto="web_researcher")
elif "done" in decision:
return Command(update={}, goto=END)
elif "chart" in decision:
return Command(update={}, goto="chart_generator")
else:
return Command(update={}, goto="chart_generator")
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
from langchain.prompts import PromptTemplate
llm = ChatOpenAI(model="gpt-4o")
@instrument()
def orchestration_node(state: MessagesState) -> Command:
# Find the most recent user query
user_query = None
user_index = None
for i, msg in reversed(list(enumerate(state["messages"]))):
if isinstance(msg, HumanMessage) and getattr(msg, "name", None) is None:
user_query = msg.content
user_index = i
break
if user_query is None:
user_query = state["messages"][0].content
user_index = 0
# Find the most recent research answer after the user query
research_answer = None
for msg in reversed(state["messages"][user_index+1:]):
if getattr(msg, "name", None) in ("web_researcher", "cortex_agents_researcher"):
research_answer = msg.content
break
# Compose prompt for the LLM
prompt = (
"You are an orchestration agent for a multi-agent data assistant.\n"
"Given the user query and the most recent research answer, decide if the research answer contains all the information needed to answer the query, or if more research is needed.\n"
"- Respond 'done' if the research answer is sufficient to answer the query and no chart is requested or if a chart has been generated.\n"
"- Respond 'web' if more public data research is needed.\n"
"- Respond 'cortex' if more private/company data research is needed.\n"
"- Respond 'chart' if a chart is requested and the research answer is sufficient to generate a chart.\n"
"\n"
"User query:\n"
f"{user_query}\n"
"\n"
"Most recent research answer:\n"
f"{research_answer if research_answer else '[No research answer yet]'}\n"
"\n"
"Respond with only one word: 'cortex', 'web', or 'none'."
)
decision = llm.invoke(prompt).content.strip().lower()
# Route accordingly
if "cortex" in decision:
return Command(update={}, goto="cortex_agent_researcher")
elif "web" in decision:
return Command(update={}, goto="web_researcher")
elif "done" in decision:
return Command(update={}, goto=END)
elif "chart" in decision:
return Command(update={}, goto="chart_generator")
else:
return Command(update={}, goto="chart_generator")
def get_next_node(last_message: BaseMessage, goto: str):
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return END
return goto
Create agent system prompt¶
In [ ]:
Copied!
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
def make_system_prompt(suffix: str) -> str:
return (
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
f"\n{suffix}"
)
Initialize Cortex Agent for Doc Search + SQL¶
In [ ]:
Copied!
class CortexAgentArgs(BaseModel):
query: str
class CortexAgentTool(StructuredTool):
name: str = "CortexAgent"
description: str = "answers questions using sales conversations and metrics"
args_schema: Type[CortexAgentArgs] = CortexAgentArgs
# Pydantic-compatible private attributes (not validated or required in __init__)
_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()
def __init__(self, session: Session):
# initialize parent class without passing custom fields
super().__init__()
self._session = session
self._root = Root(session)
self._agent_service = self._root.cortex_agent_service
def _build_request(self, query: str) -> AgentRunRequest:
return AgentRunRequest.from_dict({
"model": "claude-3-5-sonnet",
"tools": [
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "analyst1"}},
{"tool_spec": {"type": "cortex_search", "name": "search1"}},
],
"tool_resources": {
"analyst1": {"semantic_model_file": SEMANTIC_MODEL_FILE},
"search1": {
"name": CORTEX_SEARCH_SERVICE,
"max_results": 10,
"id_column": "conversation_id"
}
},
"messages": [
{"role": "user", "content": [{"type": "text", "text": query}]}
]
})
def _consume_stream(self, stream):
text, sql, citations = "", "", []
for evt in stream.events():
try:
delta = (evt.data.get("delta") if isinstance(evt.data, dict)
else json.loads(evt.data).get("delta")
or json.loads(evt.data).get("data", {}).get("delta"))
except Exception:
continue
if not isinstance(delta, dict):
continue
for item in delta.get("content", []):
if item.get("type") == "text":
text += item.get("text", "")
elif item.get("type") == "tool_results":
for result in item["tool_results"].get("content", []):
if result.get("type") != "json":
continue
j = result["json"]
text += j.get("text", "")
sql = j.get("sql", sql)
citations.extend({
"source_id": s.get("source_id"),
"doc_id": s.get("doc_id")
} for s in j.get("searchResults", []))
return text, sql, str(citations)
def run(self, query: str, **kwargs):
req = self._build_request(query)
stream = self._agent_service.run(req)
text, sql, citations = self._consume_stream(stream)
results_str = ""
if sql:
try:
df = self._session.sql(sql.rstrip(";")).to_pandas()
results_str = df.to_string(index=False)
except Exception as e:
results_str = f"SQL execution error: {e}"
return text, citations, sql, results_str
cortex_agent = CortexAgentTool(session=snowpark_session_trulens)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: MessagesState,
) -> Command[Literal["orchestrator", END]]:
# Extract the latest user message as a string
last_message = state["messages"][-1]
if hasattr(last_message, "content"):
query = last_message.content
else:
query = str(last_message)
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_agents_researcher")
# Append to the message history
messages = list(state["messages"]) + [new_message]
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": messages},
goto=goto,
)
class CortexAgentArgs(BaseModel):
query: str
class CortexAgentTool(StructuredTool):
name: str = "CortexAgent"
description: str = "answers questions using sales conversations and metrics"
args_schema: Type[CortexAgentArgs] = CortexAgentArgs
# Pydantic-compatible private attributes (not validated or required in __init__)
_session: Session = PrivateAttr()
_root: Root = PrivateAttr()
_agent_service: Any = PrivateAttr()
def __init__(self, session: Session):
# initialize parent class without passing custom fields
super().__init__()
self._session = session
self._root = Root(session)
self._agent_service = self._root.cortex_agent_service
def _build_request(self, query: str) -> AgentRunRequest:
return AgentRunRequest.from_dict({
"model": "claude-3-5-sonnet",
"tools": [
{"tool_spec": {"type": "cortex_analyst_text_to_sql", "name": "analyst1"}},
{"tool_spec": {"type": "cortex_search", "name": "search1"}},
],
"tool_resources": {
"analyst1": {"semantic_model_file": SEMANTIC_MODEL_FILE},
"search1": {
"name": CORTEX_SEARCH_SERVICE,
"max_results": 10,
"id_column": "conversation_id"
}
},
"messages": [
{"role": "user", "content": [{"type": "text", "text": query}]}
]
})
def _consume_stream(self, stream):
text, sql, citations = "", "", []
for evt in stream.events():
try:
delta = (evt.data.get("delta") if isinstance(evt.data, dict)
else json.loads(evt.data).get("delta")
or json.loads(evt.data).get("data", {}).get("delta"))
except Exception:
continue
if not isinstance(delta, dict):
continue
for item in delta.get("content", []):
if item.get("type") == "text":
text += item.get("text", "")
elif item.get("type") == "tool_results":
for result in item["tool_results"].get("content", []):
if result.get("type") != "json":
continue
j = result["json"]
text += j.get("text", "")
sql = j.get("sql", sql)
citations.extend({
"source_id": s.get("source_id"),
"doc_id": s.get("doc_id")
} for s in j.get("searchResults", []))
return text, sql, str(citations)
def run(self, query: str, **kwargs):
req = self._build_request(query)
stream = self._agent_service.run(req)
text, sql, citations = self._consume_stream(stream)
results_str = ""
if sql:
try:
df = self._session.sql(sql.rstrip(";")).to_pandas()
results_str = df.to_string(index=False)
except Exception as e:
results_str = f"SQL execution error: {e}"
return text, citations, sql, results_str
cortex_agent = CortexAgentTool(session=snowpark_session_trulens)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def cortex_agents_research_node(
state: MessagesState,
) -> Command[Literal["orchestrator", END]]:
# Extract the latest user message as a string
last_message = state["messages"][-1]
if hasattr(last_message, "content"):
query = last_message.content
else:
query = str(last_message)
# Call the tool with the string query
text, citations, sql, results_str = cortex_agent.run(query)
# Compose a message content string with all results
message_content = (
f"Answer: {text}\n"
f"Citations: {citations}\n"
f"SQL: {sql}\n"
f"Results:\n{results_str}"
)
# Compose a new HumanMessage with the result
new_message = HumanMessage(content=message_content, name="cortex_agents_researcher")
# Append to the message history
messages = list(state["messages"]) + [new_message]
goto = get_next_node(new_message, "orchestrator")
return Command(
update={"messages": messages},
goto=goto,
)
Create Python REPL Tool¶
In [ ]:
Copied!
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = (
f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
)
return (
result_str
+ "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
Create Web Search Agent¶
In [ ]:
Copied!
tavily_tool = TavilySearchResults(max_results=5)
# Research agent and node
web_search_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(
"You can only do research. You are working with a chart generator colleague."
),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: MessagesState,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
tavily_tool = TavilySearchResults(max_results=5)
# Research agent and node
web_search_agent = create_react_agent(
llm,
tools=[tavily_tool],
prompt=make_system_prompt(
"You can only do research. You are working with a chart generator colleague."
),
)
@instrument(
span_type=SpanAttributes.SpanType.RETRIEVAL,
attributes=lambda ret, exception, *args, **kwargs: {
SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0]["messages"][-1].content,
SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
ret.update["messages"][-1].content
] if hasattr(ret, "update") else "No tool call",
},
)
def web_research_node(
state: MessagesState,
) -> Command[Literal["orchestrator", END]]:
result = web_search_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(
update={
# share internal message history of research agent with other agents
"messages": result["messages"],
},
goto=goto,
)
Create Charting Agent¶
In [ ]:
Copied!
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: MessagesState) -> Command[Literal["orchestrator", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=make_system_prompt(
"You can only generate charts. You are working with a researcher colleague."
),
)
@instrument(
span_type="CHART_GENERATOR_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.chart_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.chart_node_response": (
ret.update["messages"][-1].content
if ret and hasattr(ret, "update") and ret.update
else "No update response"
),
},
)
def chart_node(state: MessagesState) -> Command[Literal["orchestrator", END]]:
result = chart_agent.invoke(state)
goto = get_next_node(result["messages"][-1], "orchestrator")
# wrap in a human message, as not all providers allow
# AI message at the last position of the input messages list
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(
update={
# share internal message history of chart agent with other agents
"messages": result["messages"],
},
goto=goto,
)
Create Chart Summary Agent¶
In [ ]:
Copied!
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: MessagesState,
) -> Command[Literal[END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], END)
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
chart_summary_agent = create_react_agent(
llm,
tools=[], # Add image processing tools if available/needed.
prompt=make_system_prompt(
"You can only generate image captions. You are working with a researcher colleague and a chart generator colleague. "
+ "Your task is to generate a concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences."
),
)
@instrument(
span_type="CHART_SUMMARY_NODE",
attributes=lambda ret, exception, *args, **kwargs: {
f"{BASE_SCOPE}.summary_node_input": args[0]["messages"][-1].content,
f"{BASE_SCOPE}.summary_node_output": ret.update["messages"][-1].content
if hasattr(ret, "update")
else "NO SUMMARY GENERATED",
},
)
def chart_summary_node(
state: MessagesState,
) -> Command[Literal[END]]:
result = chart_summary_agent.invoke(state)
# After captioning the image, we send control back (e.g., to the researcher)
goto = get_next_node(result["messages"][-1], END)
# Wrap the output message in a HumanMessage to maintain consistency in the conversation flow.
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_summarizer"
)
return Command(
update={"messages": result["messages"]},
goto=goto,
)
Build the Agent Graph¶
In [ ]:
Copied!
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("orchestrator", orchestration_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_agent_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_edge(START, "orchestrator")
graph = workflow.compile()
from langgraph.graph import START
from langgraph.graph import StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("orchestrator", orchestration_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_agent_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_edge(START, "orchestrator")
graph = workflow.compile()
Register the agent and create a run¶
In [ ]:
Copied!
class TruAgent:
def __init__(self):
self.graph = graph
def invoke_agent_graph(self, query: str) -> str:
# rebuild the graph for each query
# self.graph = workflow_compile()
# Initialize state with proper message format
state = {"messages": [HumanMessage(content=query)]}
# Stream events with recursion limit
events = self.graph.stream(
state,
{"recursion_limit": 100},
)
# Track all messages through the conversation
all_messages = []
for event in events:
# Get the payload from the event
_, payload = next(iter(event.items()))
if not payload: # Skip empty payloads
continue
messages = payload.get("messages")
if not messages:
continue
all_messages.extend(messages)
# Return the last message's content if available
return (
all_messages[-1].content
if all_messages and hasattr(all_messages[-1], "content")
else ""
)
tru_agent = TruAgent()
class TruAgent:
def __init__(self):
self.graph = graph
def invoke_agent_graph(self, query: str) -> str:
# rebuild the graph for each query
# self.graph = workflow_compile()
# Initialize state with proper message format
state = {"messages": [HumanMessage(content=query)]}
# Stream events with recursion limit
events = self.graph.stream(
state,
{"recursion_limit": 100},
)
# Track all messages through the conversation
all_messages = []
for event in events:
# Get the payload from the event
_, payload = next(iter(event.items()))
if not payload: # Skip empty payloads
continue
messages = payload.get("messages")
if not messages:
continue
all_messages.extend(messages)
# Return the last message's content if available
return (
all_messages[-1].content
if all_messages and hasattr(all_messages[-1], "content")
else ""
)
tru_agent = TruAgent()
Initialize Evaluations¶
In [ ]:
Copied!
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4.1-mini")
traj_eval_provider = OpenAI(model_engine="o3")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on_context(collect_list=True)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance"
)
.on_input()
.on_context(collect_list=False)
.aggregate(np.mean) # choose a different aggregation method if you wish
)
# Trajectory evaluations: step relevance of trace given user query
f_step_relevance = Feedback(
traj_eval_provider.trajectory_step_relevance_with_cot_reasons, name="Step Relevance"
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: logical consistency of trace
f_logical_consistency = Feedback(
traj_eval_provider.trajectory_logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: workflow efficiency of trace
f_workflow_efficiency = Feedback(
traj_eval_provider.trajectory_workflow_efficiency_with_cot_reasons,
name="Workflow Efficiency",
).on({
"trace": Selector(trace_level=True),
})
import numpy as np
from trulens.core import Feedback
from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
provider = OpenAI(model_engine="gpt-4.1-mini")
traj_eval_provider = OpenAI(model_engine="o3")
# Define a groundedness feedback function
f_groundedness = (
Feedback(
provider.groundedness_measure_with_cot_reasons, name="Groundedness"
)
.on_context(collect_list=True)
.on_output()
)
# Question/answer relevance between overall question and answer.
f_answer_relevance = (
Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
.on_input()
.on_output()
)
# Context relevance between question and each context chunk.
f_context_relevance = (
Feedback(
provider.context_relevance_with_cot_reasons, name="Context Relevance"
)
.on_input()
.on_context(collect_list=False)
.aggregate(np.mean) # choose a different aggregation method if you wish
)
# Trajectory evaluations: step relevance of trace given user query
f_step_relevance = Feedback(
traj_eval_provider.trajectory_step_relevance_with_cot_reasons, name="Step Relevance"
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: logical consistency of trace
f_logical_consistency = Feedback(
traj_eval_provider.trajectory_logical_consistency_with_cot_reasons,
name="Logical Consistency",
).on({
"trace": Selector(trace_level=True),
})
# Trajectory evaluations: workflow efficiency of trace
f_workflow_efficiency = Feedback(
traj_eval_provider.trajectory_workflow_efficiency_with_cot_reasons,
name="Workflow Efficiency",
).on({
"trace": Selector(trace_level=True),
})
Create TruLens Session and Register Agent¶
In [ ]:
Copied!
from trulens.core import TruSession
session = TruSession()
session.reset_database()
from trulens.core import TruSession
session = TruSession()
session.reset_database()
In [ ]:
Copied!
from trulens.apps.langgraph import TruGraph
tru_recorder = TruGraph(
tru_agent,
app_name="Sales Data Agent",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_step_relevance,
f_logical_consistency,
f_workflow_efficiency,
],
)
from trulens.apps.langgraph import TruGraph
tru_recorder = TruGraph(
tru_agent,
app_name="Sales Data Agent",
app_version="Base",
feedbacks=[
f_answer_relevance,
f_context_relevance,
f_groundedness,
f_step_relevance,
f_logical_consistency,
f_workflow_efficiency,
],
)
Record Agent Usage¶
In [ ]:
Copied!
import pandas as pd
user_queries = [
"What are our top 3 client deals? Chart the ACV for each deal",
"Tell me about the call with Securebank?",
"Give me an overview of security requirements commonly required for banking software",
]
user_queries_df = pd.DataFrame(user_queries, columns=["query"])
import pandas as pd
user_queries = [
"What are our top 3 client deals? Chart the ACV for each deal",
"Tell me about the call with Securebank?",
"Give me an overview of security requirements commonly required for banking software",
]
user_queries_df = pd.DataFrame(user_queries, columns=["query"])
In [ ]:
Copied!
with tru_recorder as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
result = tru_agent.invoke_agent_graph(query)
with tru_recorder as recording:
for query in user_queries:
# Run the multi-agent graph with a sample query
result = tru_agent.invoke_agent_graph(query)
Launch the TruLens Dashboard¶
In [ ]:
Copied!
from trulens.dashboard import run_dashboard
run_dashboard()
from trulens.dashboard import run_dashboard
run_dashboard()