#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import inspect
from typing import Dict, List, Optional
from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.utils.asyncio import BaseTaskManager
[docs]
@dataclass
class Proxy:
"""This is the data we receive from the main observer and that we put into
a queue for later processing.
"""
queue: asyncio.Queue
task: asyncio.Task
observer: BaseObserver
[docs]
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
passed to the frame processors. Then, every time a frame is pushed this
observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn't block the
pipeline by creating a queue and a task for each user observer. When a frame
is received, it will be put in a queue for efficiency and later processed by
each task.
"""
def __init__(
self,
*,
observers: Optional[List[BaseObserver]] = None,
task_manager: BaseTaskManager,
**kwargs,
):
super().__init__(**kwargs)
self._observers = observers or []
self._task_manager = task_manager
self._proxies: Optional[Dict[BaseObserver, Proxy]] = (
None # Becomes a dict after start() is called
)
[docs]
def add_observer(self, observer: BaseObserver):
# Add the observer to the list.
self._observers.append(observer)
# If we already started, create a new proxy for the observer.
# Otherwise, it will be created in start().
if self._started():
proxy = self._create_proxy(observer)
self._proxies[observer] = proxy
[docs]
async def remove_observer(self, observer: BaseObserver):
# If the observer has a proxy, remove it.
if observer in self._proxies:
proxy = self._proxies[observer]
# Remove the proxy so it doesn't get called anymore.
del self._proxies[observer]
# Cancel the proxy task right away.
await self._task_manager.cancel_task(proxy.task)
# Remove the observer from the list.
if observer in self._observers:
self._observers.remove(observer)
[docs]
async def start(self):
"""Starts all proxy observer tasks."""
self._proxies = self._create_proxies(self._observers)
[docs]
async def stop(self):
"""Stops all proxy observer tasks."""
for proxy in self._proxies.values():
await self._task_manager.cancel_task(proxy.task)
[docs]
async def on_push_frame(self, data: FramePushed):
for proxy in self._proxies.values():
await proxy.queue.put(data)
def _started(self) -> bool:
return self._proxies is not None
def _create_proxy(self, observer: BaseObserver) -> Proxy:
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",
)
proxy = Proxy(queue=queue, task=task, observer=observer)
return proxy
def _create_proxies(self, observers: List[BaseObserver]) -> Dict[BaseObserver, Proxy]:
proxies = {}
for observer in observers:
proxy = self._create_proxy(observer)
proxies[observer] = proxy
return proxies
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
warning_reported = False
while True:
data = await queue.get()
signature = inspect.signature(observer.on_push_frame)
if len(signature.parameters) > 1:
if not warning_reported:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.",
DeprecationWarning,
)
warning_reported = True
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
else:
await observer.on_push_frame(data)
queue.task_done()