Source code for pipecat.processors.metrics.frame_processor_metrics

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import time
from typing import Optional

from loguru import logger

from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
    LLMTokenUsage,
    LLMUsageMetricsData,
    MetricsData,
    ProcessingMetricsData,
    TTFBMetricsData,
    TTSUsageMetricsData,
)
from pipecat.utils.asyncio import TaskManager
from pipecat.utils.base_object import BaseObject


[docs] class FrameProcessorMetrics(BaseObject): def __init__(self): super().__init__() self._task_manager = None self._start_ttfb_time = 0 self._start_processing_time = 0 self._last_ttfb_time = 0 self._should_report_ttfb = True
[docs] async def setup(self, task_manager: TaskManager): self._task_manager = task_manager
[docs] async def cleanup(self): await super().cleanup()
@property def task_manager(self) -> TaskManager: return self._task_manager @property def ttfb(self) -> Optional[float]: """Get the current TTFB value in seconds. Returns: Optional[float]: The TTFB value in seconds, or None if not measured """ if self._last_ttfb_time > 0: return self._last_ttfb_time # If TTFB is in progress, calculate current value if self._start_ttfb_time > 0: return time.time() - self._start_ttfb_time return None def _processor_name(self): return self._core_metrics_data.processor def _model_name(self): return self._core_metrics_data.model
[docs] def set_core_metrics_data(self, data: MetricsData): self._core_metrics_data = data
[docs] def set_processor_name(self, name: str): self._core_metrics_data = MetricsData(processor=name)
[docs] async def start_ttfb_metrics(self, report_only_initial_ttfb): if self._should_report_ttfb: self._start_ttfb_time = time.time() self._last_ttfb_time = 0 self._should_report_ttfb = not report_only_initial_ttfb
[docs] async def stop_ttfb_metrics(self): if self._start_ttfb_time == 0: return None self._last_ttfb_time = time.time() - self._start_ttfb_time logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}") ttfb = TTFBMetricsData( processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name() ) self._start_ttfb_time = 0 return MetricsFrame(data=[ttfb])
[docs] async def start_processing_metrics(self): self._start_processing_time = time.time()
[docs] async def stop_processing_metrics(self): if self._start_processing_time == 0: return None value = time.time() - self._start_processing_time logger.debug(f"{self._processor_name()} processing time: {value}") processing = ProcessingMetricsData( processor=self._processor_name(), value=value, model=self._model_name() ) self._start_processing_time = 0 return MetricsFrame(data=[processing])
[docs] async def start_llm_usage_metrics(self, tokens: LLMTokenUsage): logger.debug( f"{self._processor_name()} prompt tokens: {tokens.prompt_tokens}, completion tokens: {tokens.completion_tokens}" ) value = LLMUsageMetricsData( processor=self._processor_name(), model=self._model_name(), value=tokens ) return MetricsFrame(data=[value])
[docs] async def start_tts_usage_metrics(self, text: str): characters = TTSUsageMetricsData( processor=self._processor_name(), model=self._model_name(), value=len(text) ) logger.debug(f"{self._processor_name()} usage characters: {characters.value}") return MetricsFrame(data=[characters])