Source code for pipecat.observers.loggers.debug_log_observer

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from dataclasses import fields, is_dataclass
from enum import Enum, auto
from typing import Dict, Optional, Set, Tuple, Type, Union

from loguru import logger

from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection


[docs] class FrameEndpoint(Enum): """Specifies which endpoint (source or destination) to filter on.""" SOURCE = auto() DESTINATION = auto()
[docs] class DebugLogObserver(BaseObserver): """Observer that logs frame activity with detailed content to the console. Automatically extracts and formats data from any frame type, making it useful for debugging pipeline behavior without needing frame-specific observers. Args: frame_types: Optional tuple of frame types to log, or a dict with frame type filters. If None, logs all frame types. exclude_fields: Optional set of field names to exclude from logging. Examples: Log all frames from all services: ```python observers = DebugLogObserver() ``` Log specific frame types from any source/destination: ```python from pipecat.frames.frames import TranscriptionFrame, InterimTranscriptionFrame observers=[ DebugLogObserver(frame_types=(LLMTextFrame,TranscriptionFrame,)), ], ``` Log frames with specific source/destination filters: ```python from pipecat.frames.frames import StartInterruptionFrame, UserStartedSpeakingFrame, LLMTextFrame from pipecat.transports.base_output_transport import BaseOutputTransport from pipecat.services.stt_service import STTService observers=[ DebugLogObserver( frame_types={ # Only log StartInterruptionFrame when source is BaseOutputTransport StartInterruptionFrame: (BaseOutputTransport, FrameEndpoint.SOURCE), # Only log UserStartedSpeakingFrame when destination is STTService UserStartedSpeakingFrame: (STTService, FrameEndpoint.DESTINATION), # Log LLMTextFrame regardless of source or destination type LLMTextFrame: None, } ), ], ``` """ def __init__( self, frame_types: Optional[ Union[Tuple[Type[Frame], ...], Dict[Type[Frame], Optional[Tuple[Type, FrameEndpoint]]]] ] = None, exclude_fields: Optional[Set[str]] = None, **kwargs, ): """Initialize the debug log observer. Args: frame_types: Tuple of frame types to log, or a dict mapping frame types to filter configurations. Filter configs can be: - None to log all instances of the frame type - A tuple of (service_type, endpoint) to filter on a specific service and endpoint (SOURCE or DESTINATION) If None is provided instead of a tuple/dict, log all frames. exclude_fields: Set of field names to exclude from logging. If None, only binary data fields are excluded. """ super().__init__(**kwargs) # Process frame filters self.frame_filters = {} if frame_types is not None: if isinstance(frame_types, tuple): # Tuple of frame types - log all instances self.frame_filters = {frame_type: None for frame_type in frame_types} else: # Dict of frame types with filters self.frame_filters = frame_types # By default, exclude binary data fields that would clutter logs self.exclude_fields = ( exclude_fields if exclude_fields is not None else { "audio", # Skip binary audio data "image", # Skip binary image data "images", # Skip lists of images } ) def _format_value(self, value): """Format a value for logging. Args: value: The value to format. Returns: str: A string representation of the value suitable for logging. """ if value is None: return "None" elif isinstance(value, str): return f"{value!r}" elif isinstance(value, (list, tuple)): if len(value) == 0: return "[]" if isinstance(value[0], dict) and len(value) > 3: # For message lists, just show count return f"{len(value)} items" return str(value) elif isinstance(value, (bytes, bytearray)): return f"{len(value)} bytes" elif hasattr(value, "get_messages_for_logging") and callable( getattr(value, "get_messages_for_logging") ): # Special case for OpenAI context return f"{value.__class__.__name__} with messages: {value.get_messages_for_logging()}" else: return str(value) def _should_log_frame(self, frame, src, dst): """Determine if a frame should be logged based on filters. Args: frame: The frame being processed src: The source component dst: The destination component Returns: bool: True if the frame should be logged, False otherwise """ # If no filters, log all frames if not self.frame_filters: return True # Check if this frame type is in our filters for frame_type, filter_config in self.frame_filters.items(): if isinstance(frame, frame_type): # If filter is None, log all instances of this frame type if filter_config is None: return True # Otherwise, check the specific filter service_type, endpoint = filter_config if endpoint == FrameEndpoint.SOURCE: return isinstance(src, service_type) elif endpoint == FrameEndpoint.DESTINATION: return isinstance(dst, service_type) return False
[docs] async def on_push_frame(self, data: FramePushed): """Process a frame being pushed into the pipeline. Logs frame details to the console with all relevant fields and values. Args: data: Event data containing the frame, source, destination, direction, and timestamp. """ src = data.source dst = data.destination frame = data.frame direction = data.direction timestamp = data.timestamp # Check if we should log this frame if not self._should_log_frame(frame, src, dst): return # Format direction arrow arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←" time_sec = timestamp / 1_000_000_000 class_name = frame.__class__.__name__ # Build frame representation frame_details = [] # If dataclass, extract fields if is_dataclass(frame): for field in fields(frame): if field.name in self.exclude_fields: continue value = getattr(frame, field.name) if value is None: continue formatted_value = self._format_value(value) frame_details.append(f"{field.name}: {formatted_value}") # Format the message if frame_details: details = ", ".join(frame_details) message = f"{class_name} {details} at {time_sec:.2f}s" else: message = f"{class_name} at {time_sec:.2f}s" # Log the message logger.debug(f"{src} {arrow} {dst}: {message}")