Source code for pipecat.processors.aggregators.vision_image_frame

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame, VisionImageRawFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


[docs] class VisionImageFrameAggregator(FrameProcessor): """This aggregator waits for a consecutive TextFrame and an InputImageRawFrame. After the InputImageRawFrame arrives it will output a VisionImageRawFrame. >>> from pipecat.frames.frames import ImageFrame >>> async def print_frames(aggregator, frame): ... async for frame in aggregator.process_frame(frame): ... print(frame) >>> aggregator = VisionImageFrameAggregator() >>> asyncio.run(print_frames(aggregator, TextFrame("What do you see?"))) >>> asyncio.run(print_frames(aggregator, ImageFrame(image=bytes([]), size=(0, 0)))) VisionImageFrame, text: What do you see?, image size: 0x0, buffer size: 0 B """ def __init__(self): super().__init__() self._describe_text = None
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, TextFrame): self._describe_text = frame.text elif isinstance(frame, InputImageRawFrame): if self._describe_text: frame = VisionImageRawFrame( text=self._describe_text, image=frame.image, size=frame.size, format=frame.format, ) await self.push_frame(frame) self._describe_text = None else: await self.push_frame(frame, direction)