Source code for pipecat.utils.base_object

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import inspect
from abc import ABC
from typing import Optional

from loguru import logger

from pipecat.utils.utils import obj_count, obj_id


[docs] class BaseObject(ABC): def __init__(self, *, name: Optional[str] = None): self._id: int = obj_id() self._name = name or f"{self.__class__.__name__}#{obj_count(self)}" # Registered event handlers. self._event_handlers: dict = {} # Set of tasks being executed. When a task finishes running it gets # automatically removed from the set. When we cleanup we wait for all # event tasks still being executed. self._event_tasks = set() @property def id(self) -> int: return self._id @property def name(self) -> str: return self._name
[docs] async def cleanup(self): if self._event_tasks: event_names, tasks = zip(*self._event_tasks) logger.debug(f"{self} waiting on event handlers to finish {list(event_names)}...") await asyncio.wait(tasks)
[docs] def event_handler(self, event_name: str): def decorator(handler): self.add_event_handler(event_name, handler) return handler return decorator
[docs] def add_event_handler(self, event_name: str, handler): if event_name in self._event_handlers: self._event_handlers[event_name].append(handler) else: logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str): if event_name not in self._event_handlers: self._event_handlers[event_name] = [] else: logger.warning(f"Event handler {event_name} not registered") async def _call_event_handler(self, event_name: str, *args, **kwargs): # If we haven't registered an event handler, we don't need to do # anything. if not self._event_handlers.get(event_name): return # Create the task. task = asyncio.create_task(self._run_task(event_name, *args, **kwargs)) # Add it to our list of event tasks. self._event_tasks.add((event_name, task)) # Remove the task from the event tasks list when the task completes. task.add_done_callback(self._event_task_finished) async def _run_task(self, event_name: str, *args, **kwargs): try: for handler in self._event_handlers[event_name]: if inspect.iscoroutinefunction(handler): await handler(self, *args, **kwargs) else: handler(self, *args, **kwargs) except Exception as e: logger.exception(f"Exception in event handler {event_name}: {e}") def _event_task_finished(self, task: asyncio.Task): tuple_to_remove = next((t for t in self._event_tasks if t[1] == task), None) if tuple_to_remove: self._event_tasks.discard(tuple_to_remove) def __str__(self): return self.name