TaskObserver

class pipecat.pipeline.task_observer.Proxy(queue, task, observer)[source]

Bases: object

This is the data we receive from the main observer and that we put into a queue for later processing.

Parameters:
  • queue (Queue)

  • task (Task)

  • observer (BaseObserver)

queue: Queue
task: Task
observer: BaseObserver
class pipecat.pipeline.task_observer.TaskObserver(*, observers=None, task_manager, **kwargs)[source]

Bases: BaseObserver

This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be passed to the frame processors. Then, every time a frame is pushed this observer will call all the observers registered to the pipeline task.

This observer makes sure that passing frames to observers doesn’t block the pipeline by creating a queue and a task for each user observer. When a frame is received, it will be put in a queue for efficiency and later processed by each task.

Parameters:
  • observers (List[BaseObserver] | None)

  • task_manager (BaseTaskManager)

add_observer(observer)[source]
Parameters:

observer (BaseObserver)

async remove_observer(observer)[source]
Parameters:

observer (BaseObserver)

async start()[source]

Starts all proxy observer tasks.

async stop()[source]

Stops all proxy observer tasks.

async on_push_frame(data)[source]

Handle the event when a frame is pushed from one processor to another.

This method should be implemented by subclasses to define specific behavior (e.g., logging, monitoring, debugging) when a frame is transferred through the pipeline.

Parameters:

data (FramePushed) – The event data containing details about the frame transfer.