Skip to content

trulens.connectors.snowflake

trulens.connectors.snowflake

Additional Dependency Required

To use this module, you must have the trulens-connectors-snowflake package installed.

pip install trulens-connectors-snowflake

Classes

SnowflakeConnector

Bases: DBConnector

Connector to snowflake databases.

Attributes
RECORDS_BATCH_TIMEOUT_IN_SEC class-attribute instance-attribute
RECORDS_BATCH_TIMEOUT_IN_SEC: int = 10

Time to wait before inserting a batch of records into the database.

Functions
reset_database
reset_database()

Reset the database. Clears all tables.

See DB.reset_database.

migrate_database
migrate_database(**kwargs: Any)

Migrates the database.

This should be run whenever there are breaking changes in a database created with an older version of trulens.

PARAMETER DESCRIPTION
**kwargs

Keyword arguments to pass to migrate_database of the current database.

TYPE: Any DEFAULT: {}

See DB.migrate_database.

add_record
add_record(
    record: Optional[Record] = None, **kwargs: Any
) -> RecordID

Add a record to the database.

PARAMETER DESCRIPTION
record

The record to add.

TYPE: Optional[Record] DEFAULT: None

**kwargs

Record fields to add to the given record or a new record if no record provided.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
RecordID

Unique record identifier str.

add_record_nowait
add_record_nowait(record: Record) -> None

Add a record to the queue to be inserted in the next batch.

add_app
add_app(app: AppDefinition) -> AppID

Add an app to the database and return its unique id.

PARAMETER DESCRIPTION
app

The app to add to the database.

TYPE: AppDefinition

RETURNS DESCRIPTION
AppID

A unique app identifier str.

delete_app
delete_app(app_id: AppID) -> None

Deletes an app from the database based on its app_id.

PARAMETER DESCRIPTION
app_id

The unique identifier of the app to be deleted.

TYPE: AppID

add_feedback_definition
add_feedback_definition(
    feedback_definition: FeedbackDefinition,
) -> FeedbackDefinitionID

Add a feedback definition to the database and return its unique id.

PARAMETER DESCRIPTION
feedback_definition

The feedback definition to add to the database.

TYPE: FeedbackDefinition

RETURNS DESCRIPTION
FeedbackDefinitionID

A unique feedback definition identifier str.

add_feedback
add_feedback(
    feedback_result_or_future: Optional[
        Union[FeedbackResult, Future[FeedbackResult]]
    ] = None,
    **kwargs: Any
) -> FeedbackResultID

Add a single feedback result or future to the database and return its unique id.

PARAMETER DESCRIPTION
feedback_result_or_future

If a concurrent.futures.Future is given, call will wait for the result before adding it to the database. If kwargs are given and a FeedbackResult is also given, the kwargs will be used to update the FeedbackResult otherwise a new one will be created with kwargs as arguments to its constructor.

TYPE: Optional[Union[FeedbackResult, Future[FeedbackResult]]] DEFAULT: None

**kwargs

Fields to add to the given feedback result or to create a new FeedbackResult with.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
FeedbackResultID

A unique result identifier str.

add_feedbacks
add_feedbacks(
    feedback_results: Iterable[
        Union[FeedbackResult, Future[FeedbackResult]]
    ],
) -> List[FeedbackResultID]

Add multiple feedback results to the database and return their unique ids.

TODO: This is slow and should be batched or otherwise optimized in the future.
PARAMETER DESCRIPTION
feedback_results

An iterable with each iteration being a FeedbackResult or concurrent.futures.Future of the same. Each given future will be waited.

TYPE: Iterable[Union[FeedbackResult, Future[FeedbackResult]]]

RETURNS DESCRIPTION
List[FeedbackResultID]

List of unique result identifiers str in the same order as input feedback_results.

get_app
get_app(app_id: AppID) -> Optional[JSONized[AppDefinition]]

Look up an app from the database.

This method produces the JSON-ized version of the app. It can be deserialized back into an AppDefinition with model_validate:

Example
from trulens.core.schema import app
app_json = session.get_app(app_id="Custom Application v1")
app = app.AppDefinition.model_validate(app_json)
Warning

Do not rely on deserializing into App as its implementations feature attributes not meant to be deserialized.

PARAMETER DESCRIPTION
app_id

The unique identifier str of the app to look up.

TYPE: AppID

RETURNS DESCRIPTION
Optional[JSONized[AppDefinition]]

JSON-ized version of the app.

get_apps
get_apps() -> List[JSONized[AppDefinition]]

Look up all apps from the database.

RETURNS DESCRIPTION
List[JSONized[AppDefinition]]

A list of JSON-ized version of all apps in the database.

Warning

Same Deserialization caveats as get_app.

get_records_and_feedback
get_records_and_feedback(
    app_ids: Optional[List[AppID]] = None,
    app_name: Optional[AppName] = None,
    app_version: Optional[AppVersion] = None,
    app_versions: Optional[List[AppVersion]] = None,
    run_name: Optional[RunName] = None,
    record_ids: Optional[List[RecordID]] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
) -> Tuple[DataFrame, List[str]]

Get records, their feedback results, and feedback names.

PARAMETER DESCRIPTION
app_ids

A list of app ids to filter records by. If empty or not given, all apps' records will be returned.

TYPE: Optional[List[AppID]] DEFAULT: None

app_name

A name of the app to filter records by. If given, only records for this app will be returned.

TYPE: Optional[AppName] DEFAULT: None

app_version

A version of the app to filter records by. If given, only records for this app version will be returned.

TYPE: Optional[AppVersion] DEFAULT: None

app_versions

A list of app versions to filter records by. If given, only records for these app versions will be returned.

TYPE: Optional[List[AppVersion]] DEFAULT: None

run_name

A run name to filter records by. If given, only records for this run will be returned.

TYPE: Optional[RunName] DEFAULT: None

record_ids

An optional list of record ids to filter records by.

TYPE: Optional[List[RecordID]] DEFAULT: None

offset

Record row offset.

TYPE: Optional[int] DEFAULT: None

limit

Limit on the number of records to return.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Tuple of:

List[str]
  • DataFrame of records with their feedback results.
Tuple[DataFrame, List[str]]
  • List of feedback names that are columns in the DataFrame.
get_leaderboard
get_leaderboard(
    app_ids: Optional[List[AppID]] = None,
    group_by_metadata_key: Optional[str] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> DataFrame

Get a leaderboard for the given apps.

PARAMETER DESCRIPTION
app_ids

A list of app ids to filter records by. If empty or not given, all apps will be included in leaderboard.

TYPE: Optional[List[AppID]] DEFAULT: None

group_by_metadata_key

A key included in record metadata that you want to group results by.

TYPE: Optional[str] DEFAULT: None

limit

Limit on the number of records to aggregate to produce the leaderboard.

TYPE: Optional[int] DEFAULT: None

offset

Record row offset to select which records to use to aggregate the leaderboard.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame of apps with their feedback results aggregated.

DataFrame

If group_by_metadata_key is provided, the DataFrame will be grouped by the specified key.

add_event
add_event(event: Event)

Add an event to the database.

PARAMETER DESCRIPTION
event

The event to add to the database.

TYPE: Event

add_events
add_events(events: List[Event])

Add multiple events to the database.

PARAMETER DESCRIPTION
events

A list of events to add to the database.

TYPE: List[Event]

get_events
get_events(
    app_name: Optional[str] = None,
    app_version: Optional[str] = None,
    record_ids: Optional[List[str]] = None,
    start_time: Optional[datetime] = None,
) -> DataFrame

Get events from the database.

PARAMETER DESCRIPTION
app_name

The app name to filter events by.

TYPE: Optional[str] DEFAULT: None

app_version

The app version to filter events by.

TYPE: Optional[str] DEFAULT: None

record_ids

The record ids to filter events by.

TYPE: Optional[List[str]] DEFAULT: None

start_time

The minimum time to consider events from.

TYPE: Optional[datetime] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

A pandas DataFrame of all relevant events.

warn_if_parallel
warn_if_parallel(run: Any) -> None

Hook called from Run before parallel invocation. Connectors may emit warnings here. Default is a no-op.

augment_app
augment_app(app: Any, **kwargs: Any) -> None

Populate Snowflake-specific fields on the App after it is constructed. Replaces the previous isinstance(connector, SnowflakeConnector) branch in [App.init][trulens.core.app.App.__init__].

warn_if_metric_parallel
warn_if_metric_parallel(value: int) -> None

Emit a Snowflake-specific warning when metric_max_workers > 1.

get_events_for_client_metrics
get_events_for_client_metrics(
    app_name: Optional[str] = None,
    app_version: Optional[str] = None,
    run_name: Optional[str] = None,
)

Snowflake-aware events retrieval for client-side metric computation. When use_account_event_table is enabled, fetches and normalizes raw events from the account event table; otherwise delegates to the standard get_events path.

SnowflakeImpl

Bases: DefaultImpl

Alembic DDL impl marker for the snowflake dialect.

SnowflakeSQLAlchemyDB

Bases: SQLAlchemyDB

SQLAlchemyDB subclass that hosts all Snowflake-dialect-specific overrides.

Attributes
redact_keys class-attribute instance-attribute

Redact secrets before writing out data.

table_prefix class-attribute instance-attribute
table_prefix: str = DEFAULT_DATABASE_PREFIX

The prefix to use for all table names.

DB interface requirement.

engine_params class-attribute instance-attribute
engine_params: dict = Field(default_factory=dict)

SQLAlchemy-related engine params.

session_params class-attribute instance-attribute
session_params: dict = Field(default_factory=dict)

SQLAlchemy-related session.

engine class-attribute instance-attribute
engine: Optional[Engine] = None

SQLAlchemy engine.

session class-attribute instance-attribute
session: Optional[sessionmaker] = None

SQLAlchemy session(maker).

orm instance-attribute
orm: Type[ORM]

Container of all the ORM classes for this database.

This should be set to a subclass of ORM upon initialization.

Functions
__repr__
__repr__() -> str

Safe repr that handles circular references.

Pydantic's default __repr__ does not guard against circular references among model instances, which leads to RecursionError (see GitHub issue #1862). This override uses the same formatted_objects context-variable that __rich_repr__ uses so that already-visited objects are replaced with a short placeholder instead of recursing infinitely.

__rich_repr__
__rich_repr__() -> Result

Requirement for pretty printing using the rich package.

__str__
__str__() -> str

Relatively concise identifier string for this instance.

reset_database
reset_database()
migrate_database
migrate_database(prior_prefix: Optional[str] = None)
check_db_revision
check_db_revision()
insert_record
insert_record(record: Record) -> RecordID
insert_app
insert_app(app: AppDefinition) -> AppID
delete_app
delete_app(app_id: AppID) -> None

Deletes an app from the database based on its app_id.

PARAMETER DESCRIPTION
app_id

The unique identifier of the app to be deleted.

TYPE: AppID

insert_feedback_definition
insert_feedback_definition(
    feedback_definition: FeedbackDefinition,
) -> FeedbackDefinitionID
get_feedback_defs
get_feedback_defs(
    feedback_definition_id: Optional[
        FeedbackDefinitionID
    ] = None,
) -> DataFrame
insert_feedback
insert_feedback(
    feedback_result: FeedbackResult,
) -> FeedbackResultID
batch_insert_feedback
batch_insert_feedback(
    feedback_results: List[FeedbackResult],
) -> List[FeedbackResultID]
get_feedback
get_feedback(
    record_id: Optional[RecordID] = None,
    feedback_result_id: Optional[FeedbackResultID] = None,
    feedback_definition_id: Optional[
        FeedbackDefinitionID
    ] = None,
    status: Optional[
        Union[
            FeedbackResultStatus,
            Sequence[FeedbackResultStatus],
        ]
    ] = None,
    last_ts_before: Optional[datetime] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    shuffle: Optional[bool] = False,
    run_location: Optional[FeedbackRunLocation] = None,
) -> DataFrame
get_feedback_count_by_status
get_feedback_count_by_status(
    record_id: Optional[RecordID] = None,
    feedback_result_id: Optional[FeedbackResultID] = None,
    feedback_definition_id: Optional[
        FeedbackDefinitionID
    ] = None,
    status: Optional[
        Union[
            FeedbackResultStatus,
            Sequence[FeedbackResultStatus],
        ]
    ] = None,
    last_ts_before: Optional[datetime] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    shuffle: bool = False,
    run_location: Optional[FeedbackRunLocation] = None,
) -> Dict[FeedbackResultStatus, int]
get_app
get_app(app_id: AppID) -> Optional[JSONized]

See DB.get_app.

get_apps
get_apps(
    app_name: Optional[AppName] = None,
) -> Iterable[JSON]
update_app_metadata
update_app_metadata(
    app_id: AppID, metadata: Dict[str, Any]
) -> Optional[AppDefinition]
get_records_and_feedback
get_records_and_feedback(
    app_ids: Optional[List[AppID]] = None,
    app_name: Optional[AppName] = None,
    app_version: Optional[AppVersion] = None,
    app_versions: Optional[List[AppVersion]] = None,
    run_name: Optional[RunName] = None,
    record_ids: Optional[List[RecordID]] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
) -> Tuple[DataFrame, Sequence[str]]
get_leaderboard_aggregates
get_leaderboard_aggregates(
    app_name: Optional[AppName] = None,
    app_versions: Optional[List[AppVersion]] = None,
) -> Tuple[DataFrame, List[str]]
insert_ground_truth
insert_ground_truth(
    ground_truth: GroundTruth,
) -> GroundTruthID
batch_insert_ground_truth
batch_insert_ground_truth(
    ground_truths: List[GroundTruth],
) -> List[GroundTruthID]
get_ground_truth
get_ground_truth(
    ground_truth_id: str | None = None,
) -> Optional[JSONized]
get_ground_truths_by_dataset
get_ground_truths_by_dataset(
    dataset_name: str,
) -> DataFrame | None
insert_dataset
insert_dataset(dataset: Dataset) -> DatasetID
get_datasets
get_datasets() -> DataFrame
insert_event
insert_event(event: Event) -> EventID
get_events
get_events(
    app_name: Optional[str],
    app_version: Optional[str],
    record_ids: Optional[List[str]],
    start_time: Optional[datetime],
) -> DataFrame
extract_app_and_run_info staticmethod
extract_app_and_run_info(
    attributes: Dict[str, Any],
    resource_attributes: Dict[str, Any],
) -> Tuple[str, str, str, str]

Get app info from attributes.

PARAMETER DESCRIPTION
attributes

Span attributes of record root.

TYPE: Dict[str, Any]

resource_attributes

Resource attributes of record root.

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Tuple[str, str, str, str]

Tuple of: app name, app version, app id, and run name.

from_tru_args classmethod
from_tru_args(
    database_url: Optional[str] = None,
    database_engine: Optional[Engine] = None,
    database_redact_keys: Optional[
        bool
    ] = DEFAULT_DATABASE_REDACT_KEYS,
    database_prefix: Optional[
        str
    ] = DEFAULT_DATABASE_PREFIX,
    **kwargs: Dict[str, Any]
) -> SQLAlchemyDB

Process database-related configuration provided to the Tru class to create a database.

Emits warnings if appropriate.

from_db_url classmethod
from_db_url(
    url: str, **kwargs: Dict[str, Any]
) -> SQLAlchemyDB

Create a database for the given url.

PARAMETER DESCRIPTION
url

The database url. This includes database type.

TYPE: str

kwargs

Additional arguments to pass to the database constructor.

TYPE: Dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
SQLAlchemyDB

A database instance.

from_db_engine classmethod
from_db_engine(
    engine: Engine, **kwargs: Dict[str, Any]
) -> SQLAlchemyDB

Create a database for the given engine. Args: engine: The database engine. kwargs: Additional arguments to pass to the database constructor. Returns: A database instance.

insert_events
insert_events(events: List[Event]) -> List[EventID]

Insert a batch of events into the database.