Source code for pipecat.observers.loggers.user_bot_latency_log_observer

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import time

from loguru import logger

from pipecat.frames.frames import (
    BotStartedSpeakingFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection


[docs] class UserBotLatencyLogObserver(BaseObserver): """Observer that logs the latency between when the user stops speaking and when the bot starts speaking. This helps measure how quickly the AI services respond. """ def __init__(self): super().__init__() self._processed_frames = set() self._user_stopped_time = 0
[docs] async def on_push_frame(self, data: FramePushed): # Only process downstream frames if data.direction != FrameDirection.DOWNSTREAM: return # Skip already processed frames if data.frame.id in self._processed_frames: return self._processed_frames.add(data.frame.id) if isinstance(data.frame, UserStartedSpeakingFrame): self._user_stopped_time = 0 elif isinstance(data.frame, UserStoppedSpeakingFrame): self._user_stopped_time = time.time() elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time: latency = time.time() - self._user_stopped_time logger.debug(f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING: {latency}")