trulens.connectors.snowflake¶
trulens.connectors.snowflake
¶
Additional Dependency Required
To use this module, you must have the trulens-connectors-snowflake package installed.
pip install trulens-connectors-snowflake
Classes¶
SnowflakeConnector
¶
Bases: DBConnector
Connector to snowflake databases.
Attributes¶
RECORDS_BATCH_TIMEOUT_IN_SEC
class-attribute
instance-attribute
¶
RECORDS_BATCH_TIMEOUT_IN_SEC: int = 10
Time to wait before inserting a batch of records into the database.
Functions¶
migrate_database
¶
migrate_database(**kwargs: Any)
Migrates the database.
This should be run whenever there are breaking changes in a database created with an older version of trulens.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
Keyword arguments to pass to migrate_database of the current database.
TYPE:
|
See DB.migrate_database.
add_record
¶
add_record_nowait
¶
add_record_nowait(record: Record) -> None
Add a record to the queue to be inserted in the next batch.
add_app
¶
add_app(app: AppDefinition) -> AppID
Add an app to the database and return its unique id.
| PARAMETER | DESCRIPTION |
|---|---|
app
|
The app to add to the database.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AppID
|
A unique app identifier |
delete_app
¶
delete_app(app_id: AppID) -> None
Deletes an app from the database based on its app_id.
| PARAMETER | DESCRIPTION |
|---|---|
app_id
|
The unique identifier of the app to be deleted.
TYPE:
|
add_feedback_definition
¶
add_feedback_definition(
feedback_definition: FeedbackDefinition,
) -> FeedbackDefinitionID
Add a feedback definition to the database and return its unique id.
| PARAMETER | DESCRIPTION |
|---|---|
feedback_definition
|
The feedback definition to add to the database.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
FeedbackDefinitionID
|
A unique feedback definition identifier |
add_feedback
¶
add_feedback(
feedback_result_or_future: Optional[
Union[FeedbackResult, Future[FeedbackResult]]
] = None,
**kwargs: Any
) -> FeedbackResultID
Add a single feedback result or future to the database and return its unique id.
| PARAMETER | DESCRIPTION |
|---|---|
feedback_result_or_future
|
If a
TYPE:
|
**kwargs
|
Fields to add to the given feedback result or to create a new FeedbackResult with.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
FeedbackResultID
|
A unique result identifier str. |
add_feedbacks
¶
add_feedbacks(
feedback_results: Iterable[
Union[FeedbackResult, Future[FeedbackResult]]
],
) -> List[FeedbackResultID]
Add multiple feedback results to the database and return their unique ids.
TODO: This is slow and should be batched or otherwise optimized in the future.¶
| PARAMETER | DESCRIPTION |
|---|---|
feedback_results
|
An iterable with each iteration being a FeedbackResult or
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[FeedbackResultID]
|
List of unique result identifiers |
get_app
¶
get_app(app_id: AppID) -> Optional[JSONized[AppDefinition]]
Look up an app from the database.
This method produces the JSON-ized version of the app. It can be deserialized back into an AppDefinition with model_validate:
Example
from trulens.core.schema import app
app_json = session.get_app(app_id="Custom Application v1")
app = app.AppDefinition.model_validate(app_json)
Warning
Do not rely on deserializing into App as its implementations feature attributes not meant to be deserialized.
| PARAMETER | DESCRIPTION |
|---|---|
app_id
|
The unique identifier
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[JSONized[AppDefinition]]
|
JSON-ized version of the app. |
get_apps
¶
get_apps() -> List[JSONized[AppDefinition]]
Look up all apps from the database.
| RETURNS | DESCRIPTION |
|---|---|
List[JSONized[AppDefinition]]
|
A list of JSON-ized version of all apps in the database. |
Warning
Same Deserialization caveats as get_app.
get_records_and_feedback
¶
get_records_and_feedback(
app_ids: Optional[List[AppID]] = None,
app_name: Optional[AppName] = None,
app_version: Optional[AppVersion] = None,
app_versions: Optional[List[AppVersion]] = None,
run_name: Optional[RunName] = None,
record_ids: Optional[List[RecordID]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
) -> Tuple[DataFrame, List[str]]
Get records, their feedback results, and feedback names.
| PARAMETER | DESCRIPTION |
|---|---|
app_ids
|
A list of app ids to filter records by. If empty or not given, all apps' records will be returned. |
app_name
|
A name of the app to filter records by. If given, only records for this app will be returned. |
app_version
|
A version of the app to filter records by. If given, only records for this app version will be returned.
TYPE:
|
app_versions
|
A list of app versions to filter records by. If given, only records for these app versions will be returned.
TYPE:
|
run_name
|
A run name to filter records by. If given, only records for this run will be returned. |
record_ids
|
An optional list of record ids to filter records by. |
offset
|
Record row offset. |
limit
|
Limit on the number of records to return. |
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
Tuple of: |
List[str]
|
|
Tuple[DataFrame, List[str]]
|
|
get_leaderboard
¶
get_leaderboard(
app_ids: Optional[List[AppID]] = None,
group_by_metadata_key: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> DataFrame
Get a leaderboard for the given apps.
| PARAMETER | DESCRIPTION |
|---|---|
app_ids
|
A list of app ids to filter records by. If empty or not given, all apps will be included in leaderboard. |
group_by_metadata_key
|
A key included in record metadata that you want to group results by. |
limit
|
Limit on the number of records to aggregate to produce the leaderboard. |
offset
|
Record row offset to select which records to use to aggregate the leaderboard. |
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
DataFrame of apps with their feedback results aggregated. |
DataFrame
|
If group_by_metadata_key is provided, the DataFrame will be grouped by the specified key. |
add_event
¶
add_event(event: Event)
Add an event to the database.
| PARAMETER | DESCRIPTION |
|---|---|
event
|
The event to add to the database.
TYPE:
|
add_events
¶
get_events
¶
get_events(
app_name: Optional[str] = None,
app_version: Optional[str] = None,
record_ids: Optional[List[str]] = None,
start_time: Optional[datetime] = None,
) -> DataFrame
Get events from the database.
| PARAMETER | DESCRIPTION |
|---|---|
app_name
|
The app name to filter events by. |
app_version
|
The app version to filter events by. |
record_ids
|
The record ids to filter events by. |
start_time
|
The minimum time to consider events from. |
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
A pandas DataFrame of all relevant events. |
warn_if_parallel
¶
warn_if_parallel(run: Any) -> None
Hook called from Run before parallel invocation. Connectors may emit warnings here. Default is a no-op.
augment_app
¶
Populate Snowflake-specific fields on the App after it is
constructed. Replaces the previous isinstance(connector,
SnowflakeConnector) branch in
[App.init][trulens.core.app.App.__init__].
warn_if_metric_parallel
¶
warn_if_metric_parallel(value: int) -> None
Emit a Snowflake-specific warning when metric_max_workers > 1.
get_events_for_client_metrics
¶
get_events_for_client_metrics(
app_name: Optional[str] = None,
app_version: Optional[str] = None,
run_name: Optional[str] = None,
)
Snowflake-aware events retrieval for client-side metric
computation. When use_account_event_table is enabled, fetches and
normalizes raw events from the account event table; otherwise
delegates to the standard get_events path.
SnowflakeImpl
¶
Bases: DefaultImpl
Alembic DDL impl marker for the snowflake dialect.
SnowflakeSQLAlchemyDB
¶
Bases: SQLAlchemyDB
SQLAlchemyDB subclass that hosts all Snowflake-dialect-specific overrides.
Attributes¶
redact_keys
class-attribute
instance-attribute
¶
redact_keys: bool = DEFAULT_DATABASE_REDACT_KEYS
Redact secrets before writing out data.
table_prefix
class-attribute
instance-attribute
¶
table_prefix: str = DEFAULT_DATABASE_PREFIX
The prefix to use for all table names.
DB interface requirement.
engine_params
class-attribute
instance-attribute
¶
SQLAlchemy-related engine params.
session_params
class-attribute
instance-attribute
¶
SQLAlchemy-related session.
session
class-attribute
instance-attribute
¶
session: Optional[sessionmaker] = None
SQLAlchemy session(maker).
orm
instance-attribute
¶
Container of all the ORM classes for this database.
This should be set to a subclass of ORM upon initialization.
Functions¶
__repr__
¶
__repr__() -> str
Safe repr that handles circular references.
Pydantic's default __repr__ does not guard against
circular references among model instances, which leads to
RecursionError (see GitHub issue #1862). This override
uses the same formatted_objects context-variable that
__rich_repr__ uses so that already-visited objects are
replaced with a short placeholder instead of recursing
infinitely.
delete_app
¶
delete_app(app_id: AppID) -> None
Deletes an app from the database based on its app_id.
| PARAMETER | DESCRIPTION |
|---|---|
app_id
|
The unique identifier of the app to be deleted.
TYPE:
|
insert_feedback_definition
¶
insert_feedback_definition(
feedback_definition: FeedbackDefinition,
) -> FeedbackDefinitionID
get_feedback_defs
¶
get_feedback_defs(
feedback_definition_id: Optional[
FeedbackDefinitionID
] = None,
) -> DataFrame
See DB.get_feedback_defs.
insert_feedback
¶
insert_feedback(
feedback_result: FeedbackResult,
) -> FeedbackResultID
See DB.insert_feedback.
batch_insert_feedback
¶
batch_insert_feedback(
feedback_results: List[FeedbackResult],
) -> List[FeedbackResultID]
get_feedback
¶
get_feedback(
record_id: Optional[RecordID] = None,
feedback_result_id: Optional[FeedbackResultID] = None,
feedback_definition_id: Optional[
FeedbackDefinitionID
] = None,
status: Optional[
Union[
FeedbackResultStatus,
Sequence[FeedbackResultStatus],
]
] = None,
last_ts_before: Optional[datetime] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
shuffle: Optional[bool] = False,
run_location: Optional[FeedbackRunLocation] = None,
) -> DataFrame
See DB.get_feedback.
get_feedback_count_by_status
¶
get_feedback_count_by_status(
record_id: Optional[RecordID] = None,
feedback_result_id: Optional[FeedbackResultID] = None,
feedback_definition_id: Optional[
FeedbackDefinitionID
] = None,
status: Optional[
Union[
FeedbackResultStatus,
Sequence[FeedbackResultStatus],
]
] = None,
last_ts_before: Optional[datetime] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
shuffle: bool = False,
run_location: Optional[FeedbackRunLocation] = None,
) -> Dict[FeedbackResultStatus, int]
update_app_metadata
¶
get_records_and_feedback
¶
get_records_and_feedback(
app_ids: Optional[List[AppID]] = None,
app_name: Optional[AppName] = None,
app_version: Optional[AppVersion] = None,
app_versions: Optional[List[AppVersion]] = None,
run_name: Optional[RunName] = None,
record_ids: Optional[List[RecordID]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
) -> Tuple[DataFrame, Sequence[str]]
get_leaderboard_aggregates
¶
insert_ground_truth
¶
insert_ground_truth(
ground_truth: GroundTruth,
) -> GroundTruthID
batch_insert_ground_truth
¶
batch_insert_ground_truth(
ground_truths: List[GroundTruth],
) -> List[GroundTruthID]
get_ground_truth
¶
See DB.get_ground_truth.
get_ground_truths_by_dataset
¶
get_events
¶
get_events(
app_name: Optional[str],
app_version: Optional[str],
record_ids: Optional[List[str]],
start_time: Optional[datetime],
) -> DataFrame
See DB.get_events.
extract_app_and_run_info
staticmethod
¶
from_tru_args
classmethod
¶
from_tru_args(
database_url: Optional[str] = None,
database_engine: Optional[Engine] = None,
database_redact_keys: Optional[
bool
] = DEFAULT_DATABASE_REDACT_KEYS,
database_prefix: Optional[
str
] = DEFAULT_DATABASE_PREFIX,
**kwargs: Dict[str, Any]
) -> SQLAlchemyDB
Process database-related configuration provided to the Tru class to create a database.
Emits warnings if appropriate.
from_db_url
classmethod
¶
from_db_url(
url: str, **kwargs: Dict[str, Any]
) -> SQLAlchemyDB
Create a database for the given url.
| PARAMETER | DESCRIPTION |
|---|---|
url
|
The database url. This includes database type.
TYPE:
|
kwargs
|
Additional arguments to pass to the database constructor. |
| RETURNS | DESCRIPTION |
|---|---|
SQLAlchemyDB
|
A database instance. |
from_db_engine
classmethod
¶
from_db_engine(
engine: Engine, **kwargs: Dict[str, Any]
) -> SQLAlchemyDB
Create a database for the given engine. Args: engine: The database engine. kwargs: Additional arguments to pass to the database constructor. Returns: A database instance.