Gated
- class pipecat.processors.aggregators.gated.GatedAggregator(gate_open_fn, gate_close_fn, start_open, direction=FrameDirection.DOWNSTREAM)[source]
Bases:
FrameProcessor
Accumulate frames, with custom functions to start and stop accumulation. Yields gate-opening frame before any accumulated frames, then ensuing frames until and not including the gate-closed frame.
Doctest: FIXME to work with asyncio >>> from pipecat.frames.frames import ImageRawFrame
>>> async def print_frames(aggregator, frame): ... async for frame in aggregator.process_frame(frame): ... if isinstance(frame, TextFrame): ... print(frame.text) ... else: ... print(frame.__class__.__name__)
>>> aggregator = GatedAggregator( ... gate_close_fn=lambda x: isinstance(x, LLMResponseStartFrame), ... gate_open_fn=lambda x: isinstance(x, ImageRawFrame), ... start_open=False) >>> asyncio.run(print_frames(aggregator, TextFrame("Hello"))) >>> asyncio.run(print_frames(aggregator, TextFrame("Hello again."))) >>> asyncio.run(print_frames(aggregator, ImageRawFrame(image=bytes([]), size=(0, 0)))) ImageRawFrame Hello Hello again. >>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye."))) Goodbye.
- Parameters:
direction (FrameDirection)
- async process_frame(frame, direction)[source]
- Parameters:
frame (Frame)
direction (FrameDirection)