Task

class pipecat.pipeline.task.PipelineParams(*, allow_interruptions=True, audio_in_sample_rate=16000, audio_out_sample_rate=24000, enable_heartbeats=False, enable_metrics=False, enable_usage_metrics=False, heartbeats_period_secs=1.0, interruption_strategies=<factory>, observers=<factory>, report_only_initial_ttfb=False, send_initial_empty_metrics=True, start_metadata=<factory>)[source]

Bases: BaseModel

Configuration parameters for pipeline execution. These parameters are usually passed to all frame processors using through StartFrame. For other generic pipeline task parameters use PipelineTask constructor arguments instead.

Parameters:
  • allow_interruptions (bool)

  • audio_in_sample_rate (int)

  • audio_out_sample_rate (int)

  • enable_heartbeats (bool)

  • enable_metrics (bool)

  • enable_usage_metrics (bool)

  • heartbeats_period_secs (float)

  • interruption_strategies (List[BaseInterruptionStrategy])

  • observers (List[BaseObserver])

  • report_only_initial_ttfb (bool)

  • send_initial_empty_metrics (bool)

  • start_metadata (Dict[str, Any])

allow_interruptions

Whether to allow pipeline interruptions.

Type:

bool

audio_in_sample_rate

Input audio sample rate in Hz.

Type:

int

audio_out_sample_rate

Output audio sample rate in Hz.

Type:

int

enable_heartbeats

Whether to enable heartbeat monitoring.

Type:

bool

enable_metrics

Whether to enable metrics collection.

Type:

bool

enable_usage_metrics

Whether to enable usage metrics.

Type:

bool

heartbeats_period_secs

Period between heartbeats in seconds.

Type:

float

observers

[deprecated] Use observers arg in PipelineTask class.

Type:

List[pipecat.observers.base_observer.BaseObserver]

report_only_initial_ttfb

Whether to report only initial time to first byte.

Type:

bool

send_initial_empty_metrics

Whether to send initial empty metrics.

Type:

bool

start_metadata

Additional metadata for pipeline start.

Type:

Dict[str, Any]

interruption_strategies

Strategies for bot interruption behavior.

Type:

List[pipecat.audio.interruptions.base_interruption_strategy.BaseInterruptionStrategy]

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

allow_interruptions: bool
audio_in_sample_rate: int
audio_out_sample_rate: int
enable_heartbeats: bool
enable_metrics: bool
enable_usage_metrics: bool
heartbeats_period_secs: float
interruption_strategies: List[BaseInterruptionStrategy]
observers: List[BaseObserver]
report_only_initial_ttfb: bool
send_initial_empty_metrics: bool
start_metadata: Dict[str, Any]
class pipecat.pipeline.task.PipelineTaskSource(up_queue, **kwargs)[source]

Bases: FrameProcessor

Source processor for pipeline tasks that handles frame routing.

This is the source processor that is linked at the beginning of the pipeline given to the pipeline task. It allows us to easily push frames downstream to the pipeline and also receive upstream frames coming from the pipeline.

Parameters:

up_queue (Queue) – Queue for upstream frame processing.

async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

class pipecat.pipeline.task.PipelineTaskSink(down_queue, **kwargs)[source]

Bases: FrameProcessor

Sink processor for pipeline tasks that handles final frame processing.

This is the sink processor that is linked at the end of the pipeline given to the pipeline task. It allows us to receive downstream frames and act on them, for example, waiting to receive an EndFrame.

Parameters:

down_queue (Queue) – Queue for downstream frame processing.

async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

class pipecat.pipeline.task.PipelineTask(pipeline, *, params=None, additional_span_attributes=None, cancel_on_idle_timeout=True, check_dangling_tasks=True, clock=None, conversation_id=None, enable_tracing=False, enable_turn_tracking=True, enable_watchdog_logging=False, idle_timeout_frames=(<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.LLMFullResponseEndFrame'>), idle_timeout_secs=300, observers=None, task_manager=None, watchdog_timeout_secs=5.0)[source]

Bases: BasePipelineTask

Manages the execution of a pipeline, handling frame processing and task lifecycle.

It has a couple of event handlers on_frame_reached_upstream and on_frame_reached_downstream that are called when upstream frames or downstream frames reach both ends of pipeline. By default, the events handlers will not be called unless some filters are set using set_reached_upstream_filter and set_reached_downstream_filter.

@task.event_handler(“on_frame_reached_upstream”) async def on_frame_reached_upstream(task, frame):

@task.event_handler(“on_frame_reached_downstream”) async def on_frame_reached_downstream(task, frame):

It also has an event handler that detects when the pipeline is idle. By default, a pipeline is idle if no BotSpeakingFrame or LLMFullResponseEndFrame are received within idle_timeout_secs.

@task.event_handler(“on_idle_timeout”) async def on_pipeline_idle_timeout(task):

There are also events to know if a pipeline has been started, stopped, ended or cancelled.

@task.event_handler(“on_pipeline_started”) async def on_pipeline_started(task, frame: StartFrame):

@task.event_handler(“on_pipeline_stopped”) async def on_pipeline_stopped(task, frame: StopFrame):

@task.event_handler(“on_pipeline_ended”) async def on_pipeline_ended(task, frame: EndFrame):

@task.event_handler(“on_pipeline_cancelled”) async def on_pipeline_cancelled(task, frame: CancelFrame):

Parameters:
  • pipeline (BasePipeline) – The pipeline to execute.

  • params (PipelineParams | None) – Configuration parameters for the pipeline.

  • additional_span_attributes (dict | None) – Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes.

  • cancel_on_idle_timeout (bool) – Whether the pipeline task should be cancelled if the idle timeout is reached.

  • check_dangling_tasks (bool) – Whether to check for processors’ tasks finishing properly.

  • clock (BaseClock | None) – Clock implementation for timing operations.

  • conversation_id (str | None) – Optional custom ID for the conversation.

  • enable_tracing (bool) – Whether to enable tracing.

  • enable_turn_tracking (bool) – Whether to enable turn tracking.

  • enable_watchdog_logging (bool) – Whether to print task processing times.

  • idle_timeout_frames (Tuple[Type[Frame], ...]) – A tuple with the frames that should trigger an idle timeout if not received withing idle_timeout_seconds.

  • idle_timeout_secs (float | None) – Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline task will be cancelled automatically.

  • observers (List[BaseObserver] | None) – List of observers for monitoring pipeline execution.

  • watchdog_timeout_secs (float) – Watchdog timer timeout (in seconds). A warning will be logged if the watchdog timer is not reset before this timeout.

  • task_manager (BaseTaskManager | None)

property params: PipelineParams

Returns the pipeline parameters of this task.

property turn_tracking_observer: TurnTrackingObserver | None

Return the turn tracking observer if enabled.

property turn_trace_observer: TurnTraceObserver | None

Return the turn trace observer if enabled.

add_observer(observer)[source]
Parameters:

observer (BaseObserver)

async remove_observer(observer)[source]
Parameters:

observer (BaseObserver)

set_reached_upstream_filter(types)[source]

Sets which frames will be checked before calling the on_frame_reached_upstream event handler.

Parameters:

types (Tuple[Type[Frame], ...])

set_reached_downstream_filter(types)[source]

Sets which frames will be checked before calling the on_frame_reached_downstream event handler.

Parameters:

types (Tuple[Type[Frame], ...])

has_finished()[source]

Indicates whether the tasks has finished. That is, all processors have stopped.

Return type:

bool

async stop_when_done()[source]

This is a helper function that sends an EndFrame to the pipeline in order to stop the task after everything in it has been processed.

async cancel()[source]

Stops the running pipeline immediately.

async run(params)[source]

Starts and manages the pipeline execution until completion or cancellation.

Parameters:

params (PipelineTaskParams)

async queue_frame(frame)[source]

Queue a single frame to be pushed down the pipeline.

Parameters:

frame (Frame) – The frame to be processed.

async queue_frames(frames)[source]

Queues multiple frames to be pushed down the pipeline.

Parameters:

frames (Iterable[Frame] | AsyncIterable[Frame]) – An iterable or async iterable of frames to be processed.