Task
- class pipecat.pipeline.task.PipelineParams(*, allow_interruptions=True, audio_in_sample_rate=16000, audio_out_sample_rate=24000, enable_heartbeats=False, enable_metrics=False, enable_usage_metrics=False, heartbeats_period_secs=1.0, interruption_strategies=<factory>, observers=<factory>, report_only_initial_ttfb=False, send_initial_empty_metrics=True, start_metadata=<factory>)[source]
Bases:
BaseModel
Configuration parameters for pipeline execution. These parameters are usually passed to all frame processors using through StartFrame. For other generic pipeline task parameters use PipelineTask constructor arguments instead.
- Parameters:
allow_interruptions (bool)
audio_in_sample_rate (int)
audio_out_sample_rate (int)
enable_heartbeats (bool)
enable_metrics (bool)
enable_usage_metrics (bool)
heartbeats_period_secs (float)
interruption_strategies (List[BaseInterruptionStrategy])
observers (List[BaseObserver])
report_only_initial_ttfb (bool)
send_initial_empty_metrics (bool)
start_metadata (Dict[str, Any])
- allow_interruptions
Whether to allow pipeline interruptions.
- Type:
bool
- audio_in_sample_rate
Input audio sample rate in Hz.
- Type:
int
- audio_out_sample_rate
Output audio sample rate in Hz.
- Type:
int
- enable_heartbeats
Whether to enable heartbeat monitoring.
- Type:
bool
- enable_metrics
Whether to enable metrics collection.
- Type:
bool
- enable_usage_metrics
Whether to enable usage metrics.
- Type:
bool
- heartbeats_period_secs
Period between heartbeats in seconds.
- Type:
float
- observers
[deprecated] Use observers arg in PipelineTask class.
- Type:
List[pipecat.observers.base_observer.BaseObserver]
- report_only_initial_ttfb
Whether to report only initial time to first byte.
- Type:
bool
- send_initial_empty_metrics
Whether to send initial empty metrics.
- Type:
bool
- start_metadata
Additional metadata for pipeline start.
- Type:
Dict[str, Any]
- interruption_strategies
Strategies for bot interruption behavior.
- Type:
List[pipecat.audio.interruptions.base_interruption_strategy.BaseInterruptionStrategy]
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- allow_interruptions: bool
- audio_in_sample_rate: int
- audio_out_sample_rate: int
- enable_heartbeats: bool
- enable_metrics: bool
- enable_usage_metrics: bool
- heartbeats_period_secs: float
- interruption_strategies: List[BaseInterruptionStrategy]
- observers: List[BaseObserver]
- report_only_initial_ttfb: bool
- send_initial_empty_metrics: bool
- start_metadata: Dict[str, Any]
- class pipecat.pipeline.task.PipelineTaskSource(up_queue, **kwargs)[source]
Bases:
FrameProcessor
Source processor for pipeline tasks that handles frame routing.
This is the source processor that is linked at the beginning of the pipeline given to the pipeline task. It allows us to easily push frames downstream to the pipeline and also receive upstream frames coming from the pipeline.
- Parameters:
up_queue (Queue) – Queue for upstream frame processing.
- async process_frame(frame, direction)[source]
- Parameters:
frame (Frame)
direction (FrameDirection)
- class pipecat.pipeline.task.PipelineTaskSink(down_queue, **kwargs)[source]
Bases:
FrameProcessor
Sink processor for pipeline tasks that handles final frame processing.
This is the sink processor that is linked at the end of the pipeline given to the pipeline task. It allows us to receive downstream frames and act on them, for example, waiting to receive an EndFrame.
- Parameters:
down_queue (Queue) – Queue for downstream frame processing.
- async process_frame(frame, direction)[source]
- Parameters:
frame (Frame)
direction (FrameDirection)
- class pipecat.pipeline.task.PipelineTask(pipeline, *, params=None, additional_span_attributes=None, cancel_on_idle_timeout=True, check_dangling_tasks=True, clock=None, conversation_id=None, enable_tracing=False, enable_turn_tracking=True, enable_watchdog_logging=False, idle_timeout_frames=(<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.LLMFullResponseEndFrame'>), idle_timeout_secs=300, observers=None, task_manager=None, watchdog_timeout_secs=5.0)[source]
Bases:
BasePipelineTask
Manages the execution of a pipeline, handling frame processing and task lifecycle.
It has a couple of event handlers on_frame_reached_upstream and on_frame_reached_downstream that are called when upstream frames or downstream frames reach both ends of pipeline. By default, the events handlers will not be called unless some filters are set using set_reached_upstream_filter and set_reached_downstream_filter.
@task.event_handler(“on_frame_reached_upstream”) async def on_frame_reached_upstream(task, frame):
…
@task.event_handler(“on_frame_reached_downstream”) async def on_frame_reached_downstream(task, frame):
…
It also has an event handler that detects when the pipeline is idle. By default, a pipeline is idle if no BotSpeakingFrame or LLMFullResponseEndFrame are received within idle_timeout_secs.
@task.event_handler(“on_idle_timeout”) async def on_pipeline_idle_timeout(task):
…
There are also events to know if a pipeline has been started, stopped, ended or cancelled.
@task.event_handler(“on_pipeline_started”) async def on_pipeline_started(task, frame: StartFrame):
…
@task.event_handler(“on_pipeline_stopped”) async def on_pipeline_stopped(task, frame: StopFrame):
…
@task.event_handler(“on_pipeline_ended”) async def on_pipeline_ended(task, frame: EndFrame):
…
@task.event_handler(“on_pipeline_cancelled”) async def on_pipeline_cancelled(task, frame: CancelFrame):
…
- Parameters:
pipeline (BasePipeline) – The pipeline to execute.
params (PipelineParams | None) – Configuration parameters for the pipeline.
additional_span_attributes (dict | None) – Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes.
cancel_on_idle_timeout (bool) – Whether the pipeline task should be cancelled if the idle timeout is reached.
check_dangling_tasks (bool) – Whether to check for processors’ tasks finishing properly.
clock (BaseClock | None) – Clock implementation for timing operations.
conversation_id (str | None) – Optional custom ID for the conversation.
enable_tracing (bool) – Whether to enable tracing.
enable_turn_tracking (bool) – Whether to enable turn tracking.
enable_watchdog_logging (bool) – Whether to print task processing times.
idle_timeout_frames (Tuple[Type[Frame], ...]) – A tuple with the frames that should trigger an idle timeout if not received withing idle_timeout_seconds.
idle_timeout_secs (float | None) – Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline task will be cancelled automatically.
observers (List[BaseObserver] | None) – List of observers for monitoring pipeline execution.
watchdog_timeout_secs (float) – Watchdog timer timeout (in seconds). A warning will be logged if the watchdog timer is not reset before this timeout.
task_manager (BaseTaskManager | None)
- property params: PipelineParams
Returns the pipeline parameters of this task.
- property turn_tracking_observer: TurnTrackingObserver | None
Return the turn tracking observer if enabled.
- property turn_trace_observer: TurnTraceObserver | None
Return the turn trace observer if enabled.
- add_observer(observer)[source]
- Parameters:
observer (BaseObserver)
- async remove_observer(observer)[source]
- Parameters:
observer (BaseObserver)
- set_reached_upstream_filter(types)[source]
Sets which frames will be checked before calling the on_frame_reached_upstream event handler.
- Parameters:
types (Tuple[Type[Frame], ...])
- set_reached_downstream_filter(types)[source]
Sets which frames will be checked before calling the on_frame_reached_downstream event handler.
- Parameters:
types (Tuple[Type[Frame], ...])
- has_finished()[source]
Indicates whether the tasks has finished. That is, all processors have stopped.
- Return type:
bool
- async stop_when_done()[source]
This is a helper function that sends an EndFrame to the pipeline in order to stop the task after everything in it has been processed.
- async cancel()[source]
Stops the running pipeline immediately.
- async run(params)[source]
Starts and manages the pipeline execution until completion or cancellation.
- Parameters:
params (PipelineTaskParams)
- async queue_frame(frame)[source]
Queue a single frame to be pushed down the pipeline.
- Parameters:
frame (Frame) – The frame to be processed.
- async queue_frames(frames)[source]
Queues multiple frames to be pushed down the pipeline.
- Parameters:
frames (Iterable[Frame] | AsyncIterable[Frame]) – An iterable or async iterable of frames to be processed.