TaskObserver
- class pipecat.pipeline.task_observer.Proxy(queue, task, observer)[source]
Bases:
object
This is the data we receive from the main observer and that we put into a queue for later processing.
- Parameters:
queue (Queue)
task (Task)
observer (BaseObserver)
- queue: Queue
- task: Task
- observer: BaseObserver
- class pipecat.pipeline.task_observer.TaskObserver(*, observers=None, task_manager, **kwargs)[source]
Bases:
BaseObserver
This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be passed to the frame processors. Then, every time a frame is pushed this observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn’t block the pipeline by creating a queue and a task for each user observer. When a frame is received, it will be put in a queue for efficiency and later processed by each task.
- Parameters:
observers (List[BaseObserver] | None)
task_manager (BaseTaskManager)
- add_observer(observer)[source]
- Parameters:
observer (BaseObserver)
- async remove_observer(observer)[source]
- Parameters:
observer (BaseObserver)
- async start()[source]
Starts all proxy observer tasks.
- async stop()[source]
Stops all proxy observer tasks.
- async on_push_frame(data)[source]
Handle the event when a frame is pushed from one processor to another.
This method should be implemented by subclasses to define specific behavior (e.g., logging, monitoring, debugging) when a frame is transferred through the pipeline.
- Parameters:
data (FramePushed) – The event data containing details about the frame transfer.