#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from loguru import logger
from pipecat.utils.asyncio import TaskManager
try:
import sentry_sdk
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sentry, you need to `pip install pipecat-ai[sentry]`.")
raise Exception(f"Missing module: {e}")
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
[docs]
class SentryMetrics(FrameProcessorMetrics):
def __init__(self):
super().__init__()
self._ttfb_metrics_tx = None
self._processing_metrics_tx = None
self._sentry_available = sentry_sdk.is_initialized()
if not self._sentry_available:
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
self._sentry_queue = asyncio.Queue()
self._sentry_task = None
[docs]
async def setup(self, task_manager: TaskManager):
await super().setup(task_manager)
if self._sentry_available:
self._sentry_queue = asyncio.Queue()
self._sentry_task = self.task_manager.create_task(
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
)
[docs]
async def cleanup(self):
await super().cleanup()
if self._sentry_task:
await self._sentry_queue.put(None)
await self.task_manager.wait_for_task(self._sentry_task)
self._sentry_task = None
logger.trace(f"{self} Flushing Sentry metrics")
sentry_sdk.flush(timeout=5.0)
[docs]
async def start_ttfb_metrics(self, report_only_initial_ttfb):
await super().start_ttfb_metrics(report_only_initial_ttfb)
if self._should_report_ttfb and self._sentry_available:
self._ttfb_metrics_tx = sentry_sdk.start_transaction(
op="ttfb",
name=f"TTFB for {self._processor_name()}",
)
logger.debug(
f"{self} Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
)
[docs]
async def stop_ttfb_metrics(self):
await super().stop_ttfb_metrics()
if self._sentry_available and self._ttfb_metrics_tx:
await self._sentry_queue.put(self._ttfb_metrics_tx)
self._ttfb_metrics_tx = None
[docs]
async def start_processing_metrics(self):
await super().start_processing_metrics()
if self._sentry_available:
self._processing_metrics_tx = sentry_sdk.start_transaction(
op="processing",
name=f"Processing for {self._processor_name()}",
)
logger.debug(
f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
)
[docs]
async def stop_processing_metrics(self):
await super().stop_processing_metrics()
if self._sentry_available and self._processing_metrics_tx:
await self._sentry_queue.put(self._processing_metrics_tx)
self._processing_metrics_tx = None
async def _sentry_task_handler(self):
running = True
while running:
tx = await self._sentry_queue.get()
if tx:
await self.task_manager.get_event_loop().run_in_executor(None, tx.finish)
running = tx is not None