FrameProcessor

class pipecat.processors.frame_processor.FrameDirection(*values)[source]

Bases: Enum

DOWNSTREAM = 1
UPSTREAM = 2
class pipecat.processors.frame_processor.FrameProcessorSetup(clock: pipecat.clocks.base_clock.BaseClock, task_manager: pipecat.utils.asyncio.BaseTaskManager, observer: pipecat.observers.base_observer.BaseObserver | None = None)[source]

Bases: object

Parameters:
  • clock (BaseClock)

  • task_manager (BaseTaskManager)

  • observer (BaseObserver | None)

clock: BaseClock
task_manager: BaseTaskManager
observer: BaseObserver | None = None
class pipecat.processors.frame_processor.FrameProcessor(*, name=None, metrics=None, enable_watchdog_logging=None, watchdog_timeout_secs=None, **kwargs)[source]

Bases: BaseObject

Parameters:
  • name (str | None)

  • metrics (FrameProcessorMetrics | None)

  • enable_watchdog_logging (bool | None)

  • watchdog_timeout_secs (float | None)

property id: int
property name: str
property interruptions_allowed
property metrics_enabled
property usage_metrics_enabled
property report_only_initial_ttfb
property interruption_strategies: Sequence[BaseInterruptionStrategy]
can_generate_metrics()[source]
Return type:

bool

set_core_metrics_data(data)[source]
Parameters:

data (MetricsData)

async start_ttfb_metrics()[source]
async stop_ttfb_metrics()[source]
async start_processing_metrics()[source]
async stop_processing_metrics()[source]
async start_llm_usage_metrics(tokens)[source]
Parameters:

tokens (LLMTokenUsage)

async start_tts_usage_metrics(text)[source]
Parameters:

text (str)

async stop_all_metrics()[source]
create_task(coroutine, name=None, *, enable_watchdog_logging=None, watchdog_timeout_secs=None)[source]
Parameters:
  • coroutine (Coroutine)

  • name (str | None)

  • enable_watchdog_logging (bool | None)

  • watchdog_timeout_secs (float | None)

Return type:

Task

async cancel_task(task, timeout=None)[source]
Parameters:
  • task (Task)

  • timeout (float | None)

async wait_for_task(task, timeout=None)[source]
Parameters:
  • task (Task)

  • timeout (float | None)

start_watchdog()[source]
reset_watchdog()[source]
async setup(setup)[source]
Parameters:

setup (FrameProcessorSetup)

async cleanup()[source]
link(processor)[source]
Parameters:

processor (FrameProcessor)

get_event_loop()[source]
Return type:

AbstractEventLoop

set_parent(parent)[source]
Parameters:

parent (FrameProcessor)

get_parent()[source]
Return type:

FrameProcessor | None

get_clock()[source]
Return type:

BaseClock

get_task_manager()[source]
Return type:

BaseTaskManager

async queue_frame(frame, direction=FrameDirection.DOWNSTREAM, callback=None)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

  • callback (Callable[[FrameProcessor, Frame, FrameDirection], Awaitable[None]] | None)

async pause_processing_frames()[source]
async resume_processing_frames()[source]
async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

async push_error(error)[source]
Parameters:

error (ErrorFrame)

async push_frame(frame, direction=FrameDirection.DOWNSTREAM)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)