Skip to content

Tru

Bases: SingletonPerName

Tru is the main class that provides an entry points to trulens-eval. Tru lets you:

  • Log app prompts and outputs
  • Log app Metadata
  • Run and log feedback functions
  • Run streamlit dashboard to view experiment results

By default, all data is logged to the current working directory to default.sqlite. Data can be logged to a SQLAlchemy-compatible referred to by database_url.

Source code in trulens_eval/trulens_eval/tru.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
class Tru(SingletonPerName):
    """
    Tru is the main class that provides an entry points to trulens-eval. Tru lets you:

    * Log app prompts and outputs
    * Log app Metadata
    * Run and log feedback functions
    * Run streamlit dashboard to view experiment results

    By default, all data is logged to the current working directory to `default.sqlite`. 
    Data can be logged to a SQLAlchemy-compatible referred to by `database_url`.
    """
    DEFAULT_DATABASE_FILE = "default.sqlite"

    # Process or Thread of the deferred feedback function evaluator.
    evaluator_proc = None

    # Process of the dashboard app.
    dashboard_proc = None

    def Chain(self, chain, **kwargs):
        """
        Create a TruChain with database managed by self.
        """

        from trulens_eval.tru_chain import TruChain

        return TruChain(tru=self, app=chain, **kwargs)

    def Llama(self, engine, **kwargs):
        """
        Create a llama_index engine with database managed by self.
        """

        from trulens_eval.tru_llama import TruLlama

        return TruLlama(tru=self, app=engine, **kwargs)

    def __init__(
        self,
        database_url: Optional[str] = None,
        database_file: Optional[str] = None,
        database_redact_keys: bool = False
    ):
        """
        TruLens instrumentation, logging, and feedback functions for apps.

        Args:
           database_url: SQLAlchemy database URL. Defaults to a local
                                SQLite database file at 'default.sqlite'
                                See [this article](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls)
                                on SQLAlchemy database URLs.
           database_file: (Deprecated) Path to a local SQLite database file
           database_redact_keys: whether to redact secret keys in data to be written to database.
        """
        if hasattr(self, "db"):
            if database_url is not None or database_file is not None:
                logger.warning(
                    f"Tru was already initialized. Cannot change database_url={database_url} or database_file={database_file} ."
                )

            # Already initialized by SingletonByName mechanism.
            return

        assert None in (database_url, database_file), \
            "Please specify at most one of `database_url` and `database_file`"

        if database_file:
            warnings.warn(
                DeprecationWarning(
                    "`database_file` is deprecated, use `database_url` instead as in `database_url='sqlite:///filename'."
                )
            )

        if database_url is None:
            database_url = f"sqlite:///{database_file or self.DEFAULT_DATABASE_FILE}"

        self.db: SqlAlchemyDB = SqlAlchemyDB.from_db_url(
            database_url, redact_keys=database_redact_keys
        )

        print(
            f"{UNICODE_SQUID} Tru initialized with db url {self.db.engine.url} ."
        )
        if database_redact_keys:
            print(
                f"{UNICODE_LOCK} Secret keys will not be included in the database."
            )
        else:
            print(
                f"{UNICODE_STOP} Secret keys may be written to the database. "
                "See the `database_redact_keys` option of `Tru` to prevent this."
            )

    def reset_database(self):
        """
        Reset the database. Clears all tables.
        """

        self.db.reset_database()

    def migrate_database(self):
        """
        Migrates the database. This should be run whenever there are breaking
        changes in a database created with an older version of trulens_eval.
        """

        self.db.migrate_database()

    def add_record(self, record: Optional[Record] = None, **kwargs):
        """
        Add a record to the database.

        Args:

            record: Record

            **kwargs: Record fields.

        Returns:
            RecordID: Unique record identifier.

        """

        if record is None:
            record = Record(**kwargs)
        else:
            record.update(**kwargs)

        return self.db.insert_record(record=record)

    update_record = add_record

    def run_feedback_functions(
        self,
        record: Record,
        feedback_functions: Sequence[Feedback],
        app: Optional[AppDefinition] = None,
    ) -> Sequence[JSON]:
        """
        Run a collection of feedback functions and report their result.

        Parameters:

            record (Record): The record on which to evaluate the feedback
            functions.

            app (App, optional): The app that produced the given record.
            If not provided, it is looked up from the given database `db`.

            feedback_functions (Sequence[Feedback]): A collection of feedback
            functions to evaluate.

        Returns nothing.
        """

        app_id = record.app_id

        if app is None:
            app = self.db.get_app(app_id=app_id)
            if app is None:
                raise RuntimeError(
                    "App {app_id} not present in db. "
                    "Either add it with `tru.add_app` or provide `app_json` to `tru.run_feedback_functions`."
                )

        else:
            assert app_id == app.app_id, "Record was produced by a different app."

            if self.db.get_app(app_id=app.app_id) is None:
                logger.warning(
                    "App {app_id} was not present in database. Adding it."
                )
                self.add_app(app=app)

        evals = []

        for func in feedback_functions:
            evals.append(
                TP().promise(lambda f: f.run(app=app, record=record), func)
            )

        evals = map(lambda p: p.get(), evals)

        return list(evals)

    def add_app(self, app: AppDefinition) -> None:
        """
        Add a app to the database.        
        """

        self.db.insert_app(app=app)

    def add_feedback(
        self, feedback_result: FeedbackResult = None, **kwargs
    ) -> None:
        """
        Add a single feedback result to the database.
        """

        if feedback_result is None:
            feedback_result = FeedbackResult(**kwargs)
        else:
            feedback_result.update(**kwargs)

        self.db.insert_feedback(feedback_result=feedback_result)

    def add_feedbacks(self, feedback_results: Iterable[FeedbackResult]) -> None:
        """
        Add multiple feedback results to the database.
        """

        for feedback_result in feedback_results:
            self.add_feedback(feedback_result=feedback_result)

    def get_app(self, app_id: Optional[str] = None) -> JSON:
        """
        Look up a app from the database.
        """

        # TODO: unserialize
        return self.db.get_app(app_id)

    def get_records_and_feedback(self, app_ids: List[str]):
        """
        Get records, their feeback results, and feedback names from the
        database. Pass an empty list of app_ids to return all.

        ```python
        tru.get_records_and_feedback(app_ids=[])
        ```
        """

        df, feedback_columns = self.db.get_records_and_feedback(app_ids)

        return df, feedback_columns

    def get_leaderboard(self, app_ids: List[str]):
        """
        Get a leaderboard by app id from the
        database. Pass an empty list of app_ids to return all.

        ```python
        tru.get_leaderboard(app_ids=[])
        ```
        """
        df, feedback_cols = self.db.get_records_and_feedback(app_ids)

        col_agg_list = feedback_cols + ['latency', 'total_cost']

        leaderboard = df.groupby('app_id')[col_agg_list].mean().sort_values(
            by=feedback_cols, ascending=False
        )

        return leaderboard

    def start_evaluator(self,
                        restart=False,
                        fork=False) -> Union[Process, Thread]:
        """
        Start a deferred feedback function evaluation thread.
        """

        assert not fork, "Fork mode not yet implemented."

        if self.evaluator_proc is not None:
            if restart:
                self.stop_evaluator()
            else:
                raise RuntimeError(
                    "Evaluator is already running in this process."
                )

        from trulens_eval.feedback import Feedback

        if not fork:
            self.evaluator_stop = threading.Event()

        def runloop():
            while fork or not self.evaluator_stop.is_set():
                #print(
                #    "Looking for things to do. Stop me with `tru.stop_evaluator()`.",
                #    end=''
                #)
                started_count = Feedback.evaluate_deferred(tru=self)

                if started_count > 0:
                    print(
                        f"{UNICODE_YIELD}{UNICODE_YIELD}{UNICODE_YIELD} Started {started_count} deferred feedback functions."
                    )
                    TP().finish()
                    print(
                        f"{UNICODE_CHECK}{UNICODE_CHECK}{UNICODE_CHECK} Finished evaluating deferred feedback functions."
                    )

                if fork:
                    sleep(10)
                else:
                    self.evaluator_stop.wait(10)

            print("Evaluator stopped.")

        if fork:
            proc = Process(target=runloop)
        else:
            proc = Thread(target=runloop)
            proc.daemon = True

        # Start a persistent thread or process that evaluates feedback functions.

        self.evaluator_proc = proc
        proc.start()

        return proc

    def stop_evaluator(self):
        """
        Stop the deferred feedback evaluation thread.
        """

        if self.evaluator_proc is None:
            raise RuntimeError("Evaluator not running this process.")

        if isinstance(self.evaluator_proc, Process):
            self.evaluator_proc.terminate()

        elif isinstance(self.evaluator_proc, Thread):
            self.evaluator_stop.set()
            self.evaluator_proc.join()
            self.evaluator_stop = None

        self.evaluator_proc = None

    def stop_dashboard(self, force: bool = False) -> None:
        """
        Stop existing dashboard(s) if running.

        Args:

            - force: bool: Also try to find any other dashboard processes not
              started in this notebook and shut them down too.

        Raises:

            - ValueError: Dashboard is not running.
        """
        if Tru.dashboard_proc is None:
            if not force:
                raise ValueError(
                    "Dashboard not running in this workspace. "
                    "You may be able to shut other instances by setting the `force` flag."
                )

            else:
                if sys.platform.startswith("win"):
                    raise RuntimeError(
                        "Force stop option is not supported on windows."
                    )

                print("Force stopping dashboard ...")
                import os
                import pwd  # PROBLEM: does not exist on windows

                import psutil
                username = pwd.getpwuid(os.getuid())[0]
                for p in psutil.process_iter():
                    try:
                        cmd = " ".join(p.cmdline())
                        if "streamlit" in cmd and "Leaderboard.py" in cmd and p.username(
                        ) == username:
                            print(f"killing {p}")
                            p.kill()
                    except Exception as e:
                        continue

        else:
            Tru.dashboard_proc.kill()
            Tru.dashboard_proc = None

    def run_dashboard_in_jupyter(self):
        # TODO: check for jupyter

        logger.warning("Running dashboard inside a notebook is an experimental feature and may not work well.")

        from streamlit_jupyter import StreamlitPatcher
        StreamlitPatcher().jupyter()
        from trulens_eval import Leaderboard

        Leaderboard.main()

    def run_dashboard(
        self, force: bool = False, _dev: Optional[Path] = None
    ) -> Process:
        """
        Run a streamlit dashboard to view logged results and apps.

        Args:

            - force: bool: Stop existing dashboard(s) first.

            - _dev: Optional[Path]: If given, run dashboard with the given
              PYTHONPATH. This can be used to run the dashboard from outside of
              its pip package installation folder.

        Raises:

            - ValueError: Dashboard is already running.

        Returns:

            - Process: Process containing streamlit dashboard.
        """

        if force:
            self.stop_dashboard(force=force)

        print("Starting dashboard ...")

        # Create .streamlit directory if it doesn't exist
        streamlit_dir = os.path.join(os.getcwd(), '.streamlit')
        os.makedirs(streamlit_dir, exist_ok=True)

        # Create config.toml file path
        config_path = os.path.join(streamlit_dir, 'config.toml')

        # Check if the file already exists
        if not os.path.exists(config_path):
            with open(config_path, 'w') as f:
                f.write('[theme]\n')
                f.write('primaryColor="#0A2C37"\n')
                f.write('backgroundColor="#FFFFFF"\n')
                f.write('secondaryBackgroundColor="F5F5F5"\n')
                f.write('textColor="#0A2C37"\n')
                f.write('font="sans serif"\n')
        else:
            print("Config file already exists. Skipping writing process.")

        # Create credentials.toml file path
        cred_path = os.path.join(streamlit_dir, 'credentials.toml')

        # Check if the file already exists
        if not os.path.exists(cred_path):
            with open(cred_path, 'w') as f:
                f.write('[general]\n')
                f.write('email=""\n')
        else:
            print("Credentials file already exists. Skipping writing process.")

        #run leaderboard with subprocess
        leaderboard_path = pkg_resources.resource_filename(
            'trulens_eval', 'Leaderboard.py'
        )

        if Tru.dashboard_proc is not None:
            print("Dashboard already running at path:", Tru.dashboard_urls)
            return Tru.dashboard_proc

        env_opts = {}
        if _dev is not None:
            env_opts['env'] = os.environ
            env_opts['env']['PYTHONPATH'] = str(_dev)

        proc = subprocess.Popen(
            [
                "streamlit", "run", "--server.headless=True", leaderboard_path,
                "--", "--database-url",
                self.db.engine.url.render_as_string(hide_password=False)
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            **env_opts
        )

        started = threading.Event()
        tunnel_started = threading.Event()
        if is_notebook():
            out_stdout, out_stderr = setup_widget_stdout_stderr()
        else:
            out_stdout = None
            out_stderr = None

        IN_COLAB = 'google.colab' in sys.modules
        if IN_COLAB:
            tunnel_proc = subprocess.Popen(
                ["npx", "localtunnel", "--port", "8501"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                **env_opts
            )

            def listen_to_tunnel(proc: subprocess.Popen, pipe, out, started):
                while proc.poll() is None:

                    line = pipe.readline()
                    if "url" in line:
                        started.set()
                        line = "Go to this url and submit the ip given here. " + line

                    if out is not None:
                        out.append_stdout(line)

                    else:
                        print(line)

            Tru.tunnel_listener_stdout = Thread(
                target=listen_to_tunnel,
                args=(
                    tunnel_proc, tunnel_proc.stdout, out_stdout, tunnel_started
                )
            )
            Tru.tunnel_listener_stderr = Thread(
                target=listen_to_tunnel,
                args=(
                    tunnel_proc, tunnel_proc.stderr, out_stderr, tunnel_started
                )
            )
            Tru.tunnel_listener_stdout.daemon = True
            Tru.tunnel_listener_stderr.daemon = True
            Tru.tunnel_listener_stdout.start()
            Tru.tunnel_listener_stderr.start()
            if not tunnel_started.wait(timeout=DASHBOARD_START_TIMEOUT
                                      ):  # This might not work on windows.
                raise RuntimeError("Tunnel failed to start in time. ")

        def listen_to_dashboard(proc: subprocess.Popen, pipe, out, started):
            while proc.poll() is None:
                line = pipe.readline()
                if IN_COLAB:
                    if "External URL: " in line:
                        started.set()
                        line = line.replace(
                            "External URL: http://", "Submit this IP Address: "
                        )
                        line = line.replace(":8501", "")
                        if out is not None:
                            out.append_stdout(line)
                        else:
                            print(line)
                        Tru.dashboard_urls = line  # store the url when dashboard is started
                else:
                    if "Network URL: " in line:
                        url = line.split(": ")[1]
                        url = url.rstrip()
                        print(f"Dashboard started at {url} .")
                        started.set()
                        Tru.dashboard_urls = line  # store the url when dashboard is started
                    if out is not None:
                        out.append_stdout(line)
                    else:
                        print(line)
            if out is not None:
                out.append_stdout("Dashboard closed.")
            else:
                print("Dashboard closed.")

        Tru.dashboard_listener_stdout = Thread(
            target=listen_to_dashboard,
            args=(proc, proc.stdout, out_stdout, started)
        )
        Tru.dashboard_listener_stderr = Thread(
            target=listen_to_dashboard,
            args=(proc, proc.stderr, out_stderr, started)
        )

        # Purposely block main process from ending and wait for dashboard.
        Tru.dashboard_listener_stdout.daemon = False
        Tru.dashboard_listener_stderr.daemon = False

        Tru.dashboard_listener_stdout.start()
        Tru.dashboard_listener_stderr.start()

        Tru.dashboard_proc = proc

        wait_period = DASHBOARD_START_TIMEOUT
        if IN_COLAB:
            # Need more time to setup 2 processes tunnel and dashboard
            wait_period = wait_period * 3
        if not started.wait(timeout=wait_period
                           ):  # This might not work on windows.
            raise RuntimeError(
                "Dashboard failed to start in time. "
                "Please inspect dashboard logs for additional information."
            )

        return proc

    start_dashboard = run_dashboard

Chain(chain, **kwargs)

Create a TruChain with database managed by self.

Source code in trulens_eval/trulens_eval/tru.py
58
59
60
61
62
63
64
65
def Chain(self, chain, **kwargs):
    """
    Create a TruChain with database managed by self.
    """

    from trulens_eval.tru_chain import TruChain

    return TruChain(tru=self, app=chain, **kwargs)

Llama(engine, **kwargs)

Create a llama_index engine with database managed by self.

Source code in trulens_eval/trulens_eval/tru.py
67
68
69
70
71
72
73
74
def Llama(self, engine, **kwargs):
    """
    Create a llama_index engine with database managed by self.
    """

    from trulens_eval.tru_llama import TruLlama

    return TruLlama(tru=self, app=engine, **kwargs)

__init__(database_url=None, database_file=None, database_redact_keys=False)

TruLens instrumentation, logging, and feedback functions for apps.

Parameters:

Name Type Description Default
database_url Optional[str]

SQLAlchemy database URL. Defaults to a local SQLite database file at 'default.sqlite' See this article on SQLAlchemy database URLs.

None
database_file Optional[str]

(Deprecated) Path to a local SQLite database file

None
database_redact_keys bool

whether to redact secret keys in data to be written to database.

False
Source code in trulens_eval/trulens_eval/tru.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def __init__(
    self,
    database_url: Optional[str] = None,
    database_file: Optional[str] = None,
    database_redact_keys: bool = False
):
    """
    TruLens instrumentation, logging, and feedback functions for apps.

    Args:
       database_url: SQLAlchemy database URL. Defaults to a local
                            SQLite database file at 'default.sqlite'
                            See [this article](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls)
                            on SQLAlchemy database URLs.
       database_file: (Deprecated) Path to a local SQLite database file
       database_redact_keys: whether to redact secret keys in data to be written to database.
    """
    if hasattr(self, "db"):
        if database_url is not None or database_file is not None:
            logger.warning(
                f"Tru was already initialized. Cannot change database_url={database_url} or database_file={database_file} ."
            )

        # Already initialized by SingletonByName mechanism.
        return

    assert None in (database_url, database_file), \
        "Please specify at most one of `database_url` and `database_file`"

    if database_file:
        warnings.warn(
            DeprecationWarning(
                "`database_file` is deprecated, use `database_url` instead as in `database_url='sqlite:///filename'."
            )
        )

    if database_url is None:
        database_url = f"sqlite:///{database_file or self.DEFAULT_DATABASE_FILE}"

    self.db: SqlAlchemyDB = SqlAlchemyDB.from_db_url(
        database_url, redact_keys=database_redact_keys
    )

    print(
        f"{UNICODE_SQUID} Tru initialized with db url {self.db.engine.url} ."
    )
    if database_redact_keys:
        print(
            f"{UNICODE_LOCK} Secret keys will not be included in the database."
        )
    else:
        print(
            f"{UNICODE_STOP} Secret keys may be written to the database. "
            "See the `database_redact_keys` option of `Tru` to prevent this."
        )

add_app(app)

Add a app to the database.

Source code in trulens_eval/trulens_eval/tru.py
224
225
226
227
228
229
def add_app(self, app: AppDefinition) -> None:
    """
    Add a app to the database.        
    """

    self.db.insert_app(app=app)

add_feedback(feedback_result=None, **kwargs)

Add a single feedback result to the database.

Source code in trulens_eval/trulens_eval/tru.py
231
232
233
234
235
236
237
238
239
240
241
242
243
def add_feedback(
    self, feedback_result: FeedbackResult = None, **kwargs
) -> None:
    """
    Add a single feedback result to the database.
    """

    if feedback_result is None:
        feedback_result = FeedbackResult(**kwargs)
    else:
        feedback_result.update(**kwargs)

    self.db.insert_feedback(feedback_result=feedback_result)

add_feedbacks(feedback_results)

Add multiple feedback results to the database.

Source code in trulens_eval/trulens_eval/tru.py
245
246
247
248
249
250
251
def add_feedbacks(self, feedback_results: Iterable[FeedbackResult]) -> None:
    """
    Add multiple feedback results to the database.
    """

    for feedback_result in feedback_results:
        self.add_feedback(feedback_result=feedback_result)

add_record(record=None, **kwargs)

Add a record to the database.

Parameters:

Name Type Description Default
record Optional[Record]

Record

None
**kwargs

Record fields.

{}

Returns:

Name Type Description
RecordID

Unique record identifier.

Source code in trulens_eval/trulens_eval/tru.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def add_record(self, record: Optional[Record] = None, **kwargs):
    """
    Add a record to the database.

    Args:

        record: Record

        **kwargs: Record fields.

    Returns:
        RecordID: Unique record identifier.

    """

    if record is None:
        record = Record(**kwargs)
    else:
        record.update(**kwargs)

    return self.db.insert_record(record=record)

get_app(app_id=None)

Look up a app from the database.

Source code in trulens_eval/trulens_eval/tru.py
253
254
255
256
257
258
259
def get_app(self, app_id: Optional[str] = None) -> JSON:
    """
    Look up a app from the database.
    """

    # TODO: unserialize
    return self.db.get_app(app_id)

get_leaderboard(app_ids)

Get a leaderboard by app id from the database. Pass an empty list of app_ids to return all.

tru.get_leaderboard(app_ids=[])
Source code in trulens_eval/trulens_eval/tru.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def get_leaderboard(self, app_ids: List[str]):
    """
    Get a leaderboard by app id from the
    database. Pass an empty list of app_ids to return all.

    ```python
    tru.get_leaderboard(app_ids=[])
    ```
    """
    df, feedback_cols = self.db.get_records_and_feedback(app_ids)

    col_agg_list = feedback_cols + ['latency', 'total_cost']

    leaderboard = df.groupby('app_id')[col_agg_list].mean().sort_values(
        by=feedback_cols, ascending=False
    )

    return leaderboard

get_records_and_feedback(app_ids)

Get records, their feeback results, and feedback names from the database. Pass an empty list of app_ids to return all.

tru.get_records_and_feedback(app_ids=[])
Source code in trulens_eval/trulens_eval/tru.py
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_records_and_feedback(self, app_ids: List[str]):
    """
    Get records, their feeback results, and feedback names from the
    database. Pass an empty list of app_ids to return all.

    ```python
    tru.get_records_and_feedback(app_ids=[])
    ```
    """

    df, feedback_columns = self.db.get_records_and_feedback(app_ids)

    return df, feedback_columns

migrate_database()

Migrates the database. This should be run whenever there are breaking changes in a database created with an older version of trulens_eval.

Source code in trulens_eval/trulens_eval/tru.py
139
140
141
142
143
144
145
def migrate_database(self):
    """
    Migrates the database. This should be run whenever there are breaking
    changes in a database created with an older version of trulens_eval.
    """

    self.db.migrate_database()

reset_database()

Reset the database. Clears all tables.

Source code in trulens_eval/trulens_eval/tru.py
132
133
134
135
136
137
def reset_database(self):
    """
    Reset the database. Clears all tables.
    """

    self.db.reset_database()

run_dashboard(force=False, _dev=None)

Run a streamlit dashboard to view logged results and apps.

Parameters:

Name Type Description Default
- force

bool: Stop existing dashboard(s) first.

required
- _dev

Optional[Path]: If given, run dashboard with the given PYTHONPATH. This can be used to run the dashboard from outside of its pip package installation folder.

required

Raises:

Type Description
-ValueError

Dashboard is already running.

Returns:

Type Description
Process
  • Process: Process containing streamlit dashboard.
Source code in trulens_eval/trulens_eval/tru.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def run_dashboard(
    self, force: bool = False, _dev: Optional[Path] = None
) -> Process:
    """
    Run a streamlit dashboard to view logged results and apps.

    Args:

        - force: bool: Stop existing dashboard(s) first.

        - _dev: Optional[Path]: If given, run dashboard with the given
          PYTHONPATH. This can be used to run the dashboard from outside of
          its pip package installation folder.

    Raises:

        - ValueError: Dashboard is already running.

    Returns:

        - Process: Process containing streamlit dashboard.
    """

    if force:
        self.stop_dashboard(force=force)

    print("Starting dashboard ...")

    # Create .streamlit directory if it doesn't exist
    streamlit_dir = os.path.join(os.getcwd(), '.streamlit')
    os.makedirs(streamlit_dir, exist_ok=True)

    # Create config.toml file path
    config_path = os.path.join(streamlit_dir, 'config.toml')

    # Check if the file already exists
    if not os.path.exists(config_path):
        with open(config_path, 'w') as f:
            f.write('[theme]\n')
            f.write('primaryColor="#0A2C37"\n')
            f.write('backgroundColor="#FFFFFF"\n')
            f.write('secondaryBackgroundColor="F5F5F5"\n')
            f.write('textColor="#0A2C37"\n')
            f.write('font="sans serif"\n')
    else:
        print("Config file already exists. Skipping writing process.")

    # Create credentials.toml file path
    cred_path = os.path.join(streamlit_dir, 'credentials.toml')

    # Check if the file already exists
    if not os.path.exists(cred_path):
        with open(cred_path, 'w') as f:
            f.write('[general]\n')
            f.write('email=""\n')
    else:
        print("Credentials file already exists. Skipping writing process.")

    #run leaderboard with subprocess
    leaderboard_path = pkg_resources.resource_filename(
        'trulens_eval', 'Leaderboard.py'
    )

    if Tru.dashboard_proc is not None:
        print("Dashboard already running at path:", Tru.dashboard_urls)
        return Tru.dashboard_proc

    env_opts = {}
    if _dev is not None:
        env_opts['env'] = os.environ
        env_opts['env']['PYTHONPATH'] = str(_dev)

    proc = subprocess.Popen(
        [
            "streamlit", "run", "--server.headless=True", leaderboard_path,
            "--", "--database-url",
            self.db.engine.url.render_as_string(hide_password=False)
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        **env_opts
    )

    started = threading.Event()
    tunnel_started = threading.Event()
    if is_notebook():
        out_stdout, out_stderr = setup_widget_stdout_stderr()
    else:
        out_stdout = None
        out_stderr = None

    IN_COLAB = 'google.colab' in sys.modules
    if IN_COLAB:
        tunnel_proc = subprocess.Popen(
            ["npx", "localtunnel", "--port", "8501"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            **env_opts
        )

        def listen_to_tunnel(proc: subprocess.Popen, pipe, out, started):
            while proc.poll() is None:

                line = pipe.readline()
                if "url" in line:
                    started.set()
                    line = "Go to this url and submit the ip given here. " + line

                if out is not None:
                    out.append_stdout(line)

                else:
                    print(line)

        Tru.tunnel_listener_stdout = Thread(
            target=listen_to_tunnel,
            args=(
                tunnel_proc, tunnel_proc.stdout, out_stdout, tunnel_started
            )
        )
        Tru.tunnel_listener_stderr = Thread(
            target=listen_to_tunnel,
            args=(
                tunnel_proc, tunnel_proc.stderr, out_stderr, tunnel_started
            )
        )
        Tru.tunnel_listener_stdout.daemon = True
        Tru.tunnel_listener_stderr.daemon = True
        Tru.tunnel_listener_stdout.start()
        Tru.tunnel_listener_stderr.start()
        if not tunnel_started.wait(timeout=DASHBOARD_START_TIMEOUT
                                  ):  # This might not work on windows.
            raise RuntimeError("Tunnel failed to start in time. ")

    def listen_to_dashboard(proc: subprocess.Popen, pipe, out, started):
        while proc.poll() is None:
            line = pipe.readline()
            if IN_COLAB:
                if "External URL: " in line:
                    started.set()
                    line = line.replace(
                        "External URL: http://", "Submit this IP Address: "
                    )
                    line = line.replace(":8501", "")
                    if out is not None:
                        out.append_stdout(line)
                    else:
                        print(line)
                    Tru.dashboard_urls = line  # store the url when dashboard is started
            else:
                if "Network URL: " in line:
                    url = line.split(": ")[1]
                    url = url.rstrip()
                    print(f"Dashboard started at {url} .")
                    started.set()
                    Tru.dashboard_urls = line  # store the url when dashboard is started
                if out is not None:
                    out.append_stdout(line)
                else:
                    print(line)
        if out is not None:
            out.append_stdout("Dashboard closed.")
        else:
            print("Dashboard closed.")

    Tru.dashboard_listener_stdout = Thread(
        target=listen_to_dashboard,
        args=(proc, proc.stdout, out_stdout, started)
    )
    Tru.dashboard_listener_stderr = Thread(
        target=listen_to_dashboard,
        args=(proc, proc.stderr, out_stderr, started)
    )

    # Purposely block main process from ending and wait for dashboard.
    Tru.dashboard_listener_stdout.daemon = False
    Tru.dashboard_listener_stderr.daemon = False

    Tru.dashboard_listener_stdout.start()
    Tru.dashboard_listener_stderr.start()

    Tru.dashboard_proc = proc

    wait_period = DASHBOARD_START_TIMEOUT
    if IN_COLAB:
        # Need more time to setup 2 processes tunnel and dashboard
        wait_period = wait_period * 3
    if not started.wait(timeout=wait_period
                       ):  # This might not work on windows.
        raise RuntimeError(
            "Dashboard failed to start in time. "
            "Please inspect dashboard logs for additional information."
        )

    return proc

run_feedback_functions(record, feedback_functions, app=None)

Run a collection of feedback functions and report their result.

Parameters:

Name Type Description Default
record Record

The record on which to evaluate the feedback

required
app App

The app that produced the given record.

None
feedback_functions Sequence[Feedback]

A collection of feedback

required

Returns nothing.

Source code in trulens_eval/trulens_eval/tru.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def run_feedback_functions(
    self,
    record: Record,
    feedback_functions: Sequence[Feedback],
    app: Optional[AppDefinition] = None,
) -> Sequence[JSON]:
    """
    Run a collection of feedback functions and report their result.

    Parameters:

        record (Record): The record on which to evaluate the feedback
        functions.

        app (App, optional): The app that produced the given record.
        If not provided, it is looked up from the given database `db`.

        feedback_functions (Sequence[Feedback]): A collection of feedback
        functions to evaluate.

    Returns nothing.
    """

    app_id = record.app_id

    if app is None:
        app = self.db.get_app(app_id=app_id)
        if app is None:
            raise RuntimeError(
                "App {app_id} not present in db. "
                "Either add it with `tru.add_app` or provide `app_json` to `tru.run_feedback_functions`."
            )

    else:
        assert app_id == app.app_id, "Record was produced by a different app."

        if self.db.get_app(app_id=app.app_id) is None:
            logger.warning(
                "App {app_id} was not present in database. Adding it."
            )
            self.add_app(app=app)

    evals = []

    for func in feedback_functions:
        evals.append(
            TP().promise(lambda f: f.run(app=app, record=record), func)
        )

    evals = map(lambda p: p.get(), evals)

    return list(evals)

start_evaluator(restart=False, fork=False)

Start a deferred feedback function evaluation thread.

Source code in trulens_eval/trulens_eval/tru.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def start_evaluator(self,
                    restart=False,
                    fork=False) -> Union[Process, Thread]:
    """
    Start a deferred feedback function evaluation thread.
    """

    assert not fork, "Fork mode not yet implemented."

    if self.evaluator_proc is not None:
        if restart:
            self.stop_evaluator()
        else:
            raise RuntimeError(
                "Evaluator is already running in this process."
            )

    from trulens_eval.feedback import Feedback

    if not fork:
        self.evaluator_stop = threading.Event()

    def runloop():
        while fork or not self.evaluator_stop.is_set():
            #print(
            #    "Looking for things to do. Stop me with `tru.stop_evaluator()`.",
            #    end=''
            #)
            started_count = Feedback.evaluate_deferred(tru=self)

            if started_count > 0:
                print(
                    f"{UNICODE_YIELD}{UNICODE_YIELD}{UNICODE_YIELD} Started {started_count} deferred feedback functions."
                )
                TP().finish()
                print(
                    f"{UNICODE_CHECK}{UNICODE_CHECK}{UNICODE_CHECK} Finished evaluating deferred feedback functions."
                )

            if fork:
                sleep(10)
            else:
                self.evaluator_stop.wait(10)

        print("Evaluator stopped.")

    if fork:
        proc = Process(target=runloop)
    else:
        proc = Thread(target=runloop)
        proc.daemon = True

    # Start a persistent thread or process that evaluates feedback functions.

    self.evaluator_proc = proc
    proc.start()

    return proc

stop_dashboard(force=False)

Stop existing dashboard(s) if running.

Parameters:

Name Type Description Default
- force

bool: Also try to find any other dashboard processes not started in this notebook and shut them down too.

required

Raises:

Type Description
-ValueError

Dashboard is not running.

Source code in trulens_eval/trulens_eval/tru.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def stop_dashboard(self, force: bool = False) -> None:
    """
    Stop existing dashboard(s) if running.

    Args:

        - force: bool: Also try to find any other dashboard processes not
          started in this notebook and shut them down too.

    Raises:

        - ValueError: Dashboard is not running.
    """
    if Tru.dashboard_proc is None:
        if not force:
            raise ValueError(
                "Dashboard not running in this workspace. "
                "You may be able to shut other instances by setting the `force` flag."
            )

        else:
            if sys.platform.startswith("win"):
                raise RuntimeError(
                    "Force stop option is not supported on windows."
                )

            print("Force stopping dashboard ...")
            import os
            import pwd  # PROBLEM: does not exist on windows

            import psutil
            username = pwd.getpwuid(os.getuid())[0]
            for p in psutil.process_iter():
                try:
                    cmd = " ".join(p.cmdline())
                    if "streamlit" in cmd and "Leaderboard.py" in cmd and p.username(
                    ) == username:
                        print(f"killing {p}")
                        p.kill()
                except Exception as e:
                    continue

    else:
        Tru.dashboard_proc.kill()
        Tru.dashboard_proc = None

stop_evaluator()

Stop the deferred feedback evaluation thread.

Source code in trulens_eval/trulens_eval/tru.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def stop_evaluator(self):
    """
    Stop the deferred feedback evaluation thread.
    """

    if self.evaluator_proc is None:
        raise RuntimeError("Evaluator not running this process.")

    if isinstance(self.evaluator_proc, Process):
        self.evaluator_proc.terminate()

    elif isinstance(self.evaluator_proc, Thread):
        self.evaluator_stop.set()
        self.evaluator_proc.join()
        self.evaluator_stop = None

    self.evaluator_proc = None