Gated

class pipecat.processors.aggregators.gated.GatedAggregator(gate_open_fn, gate_close_fn, start_open, direction=FrameDirection.DOWNSTREAM)[source]

Bases: FrameProcessor

Accumulate frames, with custom functions to start and stop accumulation. Yields gate-opening frame before any accumulated frames, then ensuing frames until and not including the gate-closed frame.

Doctest: FIXME to work with asyncio >>> from pipecat.frames.frames import ImageRawFrame

>>> async def print_frames(aggregator, frame):
...     async for frame in aggregator.process_frame(frame):
...         if isinstance(frame, TextFrame):
...             print(frame.text)
...         else:
...             print(frame.__class__.__name__)
>>> aggregator = GatedAggregator(
...     gate_close_fn=lambda x: isinstance(x, LLMResponseStartFrame),
...     gate_open_fn=lambda x: isinstance(x, ImageRawFrame),
...     start_open=False)
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello")))
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello again.")))
>>> asyncio.run(print_frames(aggregator, ImageRawFrame(image=bytes([]), size=(0, 0))))
ImageRawFrame
Hello
Hello again.
>>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye.")))
Goodbye.
Parameters:

direction (FrameDirection)

async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)