Utils

class pipecat.tests.utils.SleepFrame(sleep=0.1)[source]

Bases: SystemFrame

This frame is used by test framework to introduce some sleep time before the next frame is pushed. This is useful to control system frames vs data or control frames.

Parameters:

sleep (float)

sleep: float = 0.1
class pipecat.tests.utils.HeartbeatsObserver(*, target, heartbeat_callback, **kwargs)[source]

Bases: BaseObserver

Parameters:
  • target (FrameProcessor)

  • heartbeat_callback (Callable[[FrameProcessor, HeartbeatFrame], Awaitable[None]])

async on_push_frame(data)[source]

Handle the event when a frame is pushed from one processor to another.

This method should be implemented by subclasses to define specific behavior (e.g., logging, monitoring, debugging) when a frame is transferred through the pipeline.

Parameters:

data (FramePushed) – The event data containing details about the frame transfer.

class pipecat.tests.utils.QueuedFrameProcessor(*, queue, queue_direction, ignore_start=True)[source]

Bases: FrameProcessor

Parameters:
  • queue (Queue)

  • queue_direction (FrameDirection)

  • ignore_start (bool)

async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

async pipecat.tests.utils.run_test(processor, *, frames_to_send, expected_down_frames=None, expected_up_frames=None, ignore_start=True, observers=None, start_metadata=None, send_end_frame=True)[source]
Parameters:
  • processor (FrameProcessor)

  • frames_to_send (Sequence[Frame])

  • expected_down_frames (Sequence[type] | None)

  • expected_up_frames (Sequence[type] | None)

  • ignore_start (bool)

  • observers (List[BaseObserver] | None)

  • start_metadata (Dict[str, Any] | None)

  • send_end_frame (bool)

Return type:

Tuple[Sequence[Frame], Sequence[Frame]]