#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from abc import abstractmethod
from dataclasses import dataclass
from typing import AsyncIterable, Iterable
from pipecat.frames.frames import Frame
from pipecat.utils.base_object import BaseObject
[docs]
@dataclass
class PipelineTaskParams:
"""Specific configuration for the pipeline task."""
loop: asyncio.AbstractEventLoop
[docs]
class BasePipelineTask(BaseObject):
[docs]
@abstractmethod
def has_finished(self) -> bool:
"""Indicates whether the tasks has finished. That is, all processors
have stopped.
"""
pass
[docs]
@abstractmethod
async def stop_when_done(self):
"""This is a helper function that sends an EndFrame to the pipeline in
order to stop the task after everything in it has been processed.
"""
pass
[docs]
@abstractmethod
async def cancel(self):
"""Stops the running pipeline immediately."""
pass
[docs]
@abstractmethod
async def run(self, params: PipelineTaskParams):
"""Starts running the given pipeline."""
pass
[docs]
@abstractmethod
async def queue_frame(self, frame: Frame):
"""Queue a frame to be pushed down the pipeline."""
pass
[docs]
@abstractmethod
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
"""Queues multiple frames to be pushed down the pipeline."""
pass