Source code for pipecat.processors.async_generator

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
from typing import Any, AsyncGenerator

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer


[docs] class AsyncGeneratorProcessor(FrameProcessor): def __init__(self, *, serializer: FrameSerializer, **kwargs): super().__init__(**kwargs) self._serializer = serializer self._data_queue = asyncio.Queue()
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) await self.push_frame(frame, direction) if isinstance(frame, (CancelFrame, EndFrame)): await self._data_queue.put(None) else: data = await self._serializer.serialize(frame) if data: await self._data_queue.put(data)
[docs] async def generator(self) -> AsyncGenerator[Any, None]: running = True while running: data = await self._data_queue.get() running = data is not None if data: yield data