#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence
from loguru import logger
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
CancelFrame,
ErrorFrame,
Frame,
FrameProcessorPauseFrame,
FrameProcessorPauseUrgentFrame,
FrameProcessorResumeFrame,
FrameProcessorResumeUrgentFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio import BaseTaskManager
from pipecat.utils.base_object import BaseObject
[docs]
class FrameDirection(Enum):
DOWNSTREAM = 1
UPSTREAM = 2
[docs]
@dataclass
class FrameProcessorSetup:
clock: BaseClock
task_manager: BaseTaskManager
observer: Optional[BaseObserver] = None
[docs]
class FrameProcessor(BaseObject):
def __init__(
self,
*,
name: Optional[str] = None,
metrics: Optional[FrameProcessorMetrics] = None,
enable_watchdog_logging: Optional[bool] = None,
watchdog_timeout_secs: Optional[float] = None,
**kwargs,
):
super().__init__(name=name)
self._parent: Optional["FrameProcessor"] = None
self._prev: Optional["FrameProcessor"] = None
self._next: Optional["FrameProcessor"] = None
# Enable watchdog logging for all tasks created by this frame processor.
self._enable_watchdog_logging = enable_watchdog_logging
# Allow this frame processor to control their tasks timeout.
self._watchdog_timeout = watchdog_timeout_secs
# Clock
self._clock: Optional[BaseClock] = None
# Task Manager
self._task_manager: Optional[BaseTaskManager] = None
# Observer
self._observer: Optional[BaseObserver] = None
# Other properties
self._allow_interruptions = False
self._enable_metrics = False
self._enable_usage_metrics = False
self._report_only_initial_ttfb = False
self._interruption_strategies: List[BaseInterruptionStrategy] = []
# Indicates whether we have received the StartFrame.
self.__started = False
# Cancellation is done through CancelFrame (a system frame). This could
# cause other events being triggered (e.g. closing a transport) which
# could also cause other frames to be pushed from other tasks
# (e.g. EndFrame). So, when we are cancelling we don't want anything
# else to be pushed.
self._cancelling = False
# Metrics
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__input_event = asyncio.Event()
self.__input_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
return self._id
@property
def name(self) -> str:
return self._name
@property
def interruptions_allowed(self):
return self._allow_interruptions
@property
def metrics_enabled(self):
return self._enable_metrics
@property
def usage_metrics_enabled(self):
return self._enable_usage_metrics
@property
def report_only_initial_ttfb(self):
return self._report_only_initial_ttfb
@property
def interruption_strategies(self) -> Sequence[BaseInterruptionStrategy]:
return self._interruption_strategies
[docs]
def can_generate_metrics(self) -> bool:
return False
[docs]
def set_core_metrics_data(self, data: MetricsData):
self._metrics.set_core_metrics_data(data)
[docs]
async def start_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
[docs]
async def stop_ttfb_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics()
if frame:
await self.push_frame(frame)
[docs]
async def start_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()
[docs]
async def stop_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics()
if frame:
await self.push_frame(frame)
[docs]
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_llm_usage_metrics(tokens)
if frame:
await self.push_frame(frame)
[docs]
async def start_tts_usage_metrics(self, text: str):
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_tts_usage_metrics(text)
if frame:
await self.push_frame(frame)
[docs]
async def stop_all_metrics(self):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
[docs]
def create_task(
self,
coroutine: Coroutine,
name: Optional[str] = None,
*,
enable_watchdog_logging: Optional[bool] = None,
watchdog_timeout_secs: Optional[float] = None,
) -> asyncio.Task:
if name:
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.get_task_manager().create_task(
coroutine,
name,
enable_watchdog_logging=(
enable_watchdog_logging
if enable_watchdog_logging
else self._enable_watchdog_logging
),
watchdog_timeout=(
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout
),
)
[docs]
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
await self.get_task_manager().cancel_task(task, timeout)
[docs]
async def wait_for_task(self, task: asyncio.Task, timeout: Optional[float] = None):
await self.get_task_manager().wait_for_task(task, timeout)
[docs]
def start_watchdog(self):
self.get_task_manager().start_watchdog(asyncio.current_task())
[docs]
def reset_watchdog(self):
self.get_task_manager().reset_watchdog(asyncio.current_task())
[docs]
async def setup(self, setup: FrameProcessorSetup):
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
if self._metrics is not None:
await self._metrics.setup(self._task_manager)
[docs]
async def cleanup(self):
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
if self._metrics is not None:
await self._metrics.cleanup()
[docs]
def link(self, processor: "FrameProcessor"):
self._next = processor
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")
[docs]
def get_event_loop(self) -> asyncio.AbstractEventLoop:
return self.get_task_manager().get_event_loop()
[docs]
def set_parent(self, parent: "FrameProcessor"):
self._parent = parent
[docs]
def get_parent(self) -> Optional["FrameProcessor"]:
return self._parent
[docs]
def get_clock(self) -> BaseClock:
if not self._clock:
raise Exception(f"{self} Clock is still not initialized.")
return self._clock
[docs]
def get_task_manager(self) -> BaseTaskManager:
if not self._task_manager:
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
[docs]
async def queue_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
callback: Optional[
Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
] = None,
):
# If we are cancelling we don't want to process any other frame.
if self._cancelling:
return
if isinstance(frame, SystemFrame):
# We don't want to queue system frames.
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
[docs]
async def pause_processing_frames(self):
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
[docs]
async def resume_processing_frames(self):
logger.trace(f"{self}: resuming frame processing")
self.__input_event.set()
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
await self.__start(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, StopInterruptionFrame):
self._should_report_ttfb = True
elif isinstance(frame, CancelFrame):
await self.__cancel(frame)
elif isinstance(frame, (FrameProcessorPauseFrame, FrameProcessorPauseUrgentFrame)):
await self.__pause(frame)
elif isinstance(frame, (FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame)):
await self.__resume(frame)
[docs]
async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if not self._check_started(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
async def __start(self, frame: StartFrame):
self.__started = True
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
self._interruption_strategies = frame.interruption_strategies
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame):
if frame.processor.name == self.name:
await self.pause_processing_frames()
async def __resume(self, frame: FrameProcessorResumeFrame | FrameProcessorResumeUrgentFrame):
if frame.processor.name == self.name:
await self.resume_processing_frames()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
if self._observer:
data = FramePushed(
source=self,
destination=self._next,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
if self._observer:
data = FramePushed(
source=self,
destination=self._prev,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
def _check_started(self, frame: Frame):
if not self.__started:
logger.error(f"{self} Trying to process {frame} but StartFrame not received yet")
return self.__started
def __create_input_task(self):
if not self.__input_frame_task:
self.__should_block_frames = False
self.__input_event.clear()
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self):
if self.__input_frame_task:
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
async def __input_frame_task_handler(self):
while True:
if self.__should_block_frames:
logger.trace(f"{self}: frame processing paused")
await self.__input_event.wait()
self.__input_event.clear()
self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
try:
self.start_watchdog()
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
finally:
self.__input_queue.task_done()
self.reset_watchdog()
def __create_push_task(self):
if not self.__push_frame_task:
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
if self.__push_frame_task:
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
while True:
(frame, direction) = await self.__push_queue.get()
self.start_watchdog()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()
self.reset_watchdog()