Source code for pipecat.frames.frames

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from dataclasses import dataclass, field
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    Dict,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Tuple,
)

from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id

if TYPE_CHECKING:
    from pipecat.processors.frame_processor import FrameProcessor


[docs] class KeypadEntry(str, Enum): """DTMF entries.""" ONE = "1" TWO = "2" THREE = "3" FOUR = "4" FIVE = "5" SIX = "6" SEVEN = "7" EIGHT = "8" NINE = "9" ZERO = "0" POUND = "#" STAR = "*"
[docs] def format_pts(pts: Optional[int]): return nanoseconds_to_str(pts) if pts else None
[docs] @dataclass class Frame: """Base frame class.""" id: int = field(init=False) name: str = field(init=False) pts: Optional[int] = field(init=False) metadata: Dict[str, Any] = field(init=False) transport_source: Optional[str] = field(init=False) transport_destination: Optional[str] = field(init=False) def __post_init__(self): self.id: int = obj_id() self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" self.pts: Optional[int] = None self.metadata: Dict[str, Any] = {} self.transport_source: Optional[str] = None self.transport_destination: Optional[str] = None def __str__(self): return self.name
[docs] @dataclass class SystemFrame(Frame): """System frames are frames that are not internally queued by any of the frame processors and should be processed immediately. """ pass
[docs] @dataclass class DataFrame(Frame): """Data frames are frames that will be processed in order and usually contain data such as LLM context, text, audio or images. """ pass
[docs] @dataclass class ControlFrame(Frame): """Control frames are frames that, similar to data frames, will be processed in order and usually contain control information such as frames to update settings or to end the pipeline. """ pass
# # Mixins #
[docs] @dataclass class AudioRawFrame: """A chunk of audio.""" audio: bytes sample_rate: int num_channels: int num_frames: int = field(default=0, init=False) def __post_init__(self): self.num_frames = int(len(self.audio) / (self.num_channels * 2))
[docs] @dataclass class ImageRawFrame: """A raw image.""" image: bytes size: Tuple[int, int] format: Optional[str]
# # Data frames. #
[docs] @dataclass class OutputAudioRawFrame(DataFrame, AudioRawFrame): """A chunk of audio. Will be played by the output transport. If the transport supports multiple audio destinations (e.g. multiple audio tracks) the destination name can be specified. """ def __post_init__(self): super().__post_init__() self.num_frames = int(len(self.audio) / (self.num_channels * 2)) def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class OutputImageRawFrame(DataFrame, ImageRawFrame): """An image that will be shown by the transport. If the transport supports multiple video destinations (e.g. multiple video tracks) the destination name can be specified. """ def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
[docs] @dataclass class TTSAudioRawFrame(OutputAudioRawFrame): """A chunk of output audio generated by a TTS service.""" pass
[docs] @dataclass class URLImageRawFrame(OutputImageRawFrame): """An output image with an associated URL. These images are usually generated by third-party services that provide a URL to download the image. """ url: Optional[str] = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
[docs] @dataclass class SpriteFrame(DataFrame): """An animated sprite. Will be shown by the transport if the transport's camera is enabled. Will play at the framerate specified in the transport's `camera_out_framerate` constructor parameter. """ images: List[OutputImageRawFrame] def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, size: {len(self.images)})"
[docs] @dataclass class TextFrame(DataFrame): """A chunk of text. Emitted by LLM services, consumed by TTS services, can be used to send text through processors. """ text: str def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, text: [{self.text}])"
[docs] @dataclass class LLMTextFrame(TextFrame): """A text frame generated by LLM services.""" pass
[docs] @dataclass class TTSTextFrame(TextFrame): """A text frame generated by TTS services.""" pass
[docs] @dataclass class TranscriptionFrame(TextFrame): """A text frame with transcription-specific data. The `result` field contains the result from the STT service if available. """ user_id: str timestamp: str language: Optional[Language] = None result: Optional[Any] = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class InterimTranscriptionFrame(TextFrame): """A text frame with interim transcription-specific data. The `result` field contains the result from the STT service if available. """ text: str user_id: str timestamp: str language: Optional[Language] = None result: Optional[Any] = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class TranslationFrame(TextFrame): """A text frame with translated transcription data. Will be placed in the transport's receive queue when a participant speaks. """ user_id: str timestamp: str language: Optional[Language] = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class OpenAILLMContextAssistantTimestampFrame(DataFrame): """Timestamp information for assistant message in LLM context.""" timestamp: str
[docs] @dataclass class TranscriptionMessage: """A message in a conversation transcript containing the role and content. Messages are in standard format with roles normalized to user/assistant. """ role: Literal["user", "assistant"] content: str user_id: Optional[str] = None timestamp: Optional[str] = None
[docs] @dataclass class TranscriptionUpdateFrame(DataFrame): """A frame containing new messages added to the conversation transcript. This frame is emitted when new messages are added to the conversation history, containing only the newly added messages rather than the full transcript. Messages have normalized roles (user/assistant) regardless of the LLM service used. Messages are always in the OpenAI standard message format, which supports both: Simple format: [ { "role": "user", "content": "Hi, how are you?" }, { "role": "assistant", "content": "Great! And you?" } ] Content list format: [ { "role": "user", "content": [{"type": "text", "text": "Hi, how are you?"}] }, { "role": "assistant", "content": [{"type": "text", "text": "Great! And you?"}] } ] OpenAI supports both formats. Anthropic and Google messages are converted to the content list format. """ messages: List[TranscriptionMessage] def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, messages: {len(self.messages)})"
[docs] @dataclass class LLMMessagesFrame(DataFrame): """A frame containing a list of LLM messages. Used to signal that an LLM service should run a chat completion and emit an LLMFullResponseStartFrame, TextFrames and an LLMFullResponseEndFrame. Note that the `messages` property in this class is mutable, and will be be updated by various aggregators. """ messages: List[dict]
[docs] @dataclass class LLMMessagesAppendFrame(DataFrame): """A frame containing a list of LLM messages that need to be added to the current context. """ messages: List[dict]
[docs] @dataclass class LLMMessagesUpdateFrame(DataFrame): """A frame containing a list of new LLM messages. These messages will replace the current context LLM messages and should generate a new LLMMessagesFrame. """ messages: List[dict]
[docs] @dataclass class LLMSetToolsFrame(DataFrame): """A frame containing a list of tools for an LLM to use for function calling. The specific format depends on the LLM being used, but it should typically contain JSON Schema objects. """ tools: List[dict]
[docs] @dataclass class LLMSetToolChoiceFrame(DataFrame): """A frame containing a tool choice for an LLM to use for function calling.""" tool_choice: Literal["none", "auto", "required"] | dict
[docs] @dataclass class LLMEnablePromptCachingFrame(DataFrame): """A frame to enable/disable prompt caching in certain LLMs.""" enable: bool
[docs] @dataclass class TTSSpeakFrame(DataFrame): """A frame that contains a text that should be spoken by the TTS in the pipeline (if any). """ text: str
[docs] @dataclass class TransportMessageFrame(DataFrame): message: Any def __str__(self): return f"{self.name}(message: {self.message})"
[docs] @dataclass class DTMFFrame: """A DTMF button frame""" button: KeypadEntry
[docs] @dataclass class OutputDTMFFrame(DTMFFrame, DataFrame): """A DTMF keypress output that will be queued. If your transport supports multiple dial-out destinations, use the `transport_destination` field to specify where the DTMF keypress should be sent. """ pass
# # System frames #
[docs] @dataclass class StartFrame(SystemFrame): """This is the first frame that should be pushed down a pipeline.""" audio_in_sample_rate: int = 16000 audio_out_sample_rate: int = 24000 allow_interruptions: bool = False enable_metrics: bool = False enable_usage_metrics: bool = False report_only_initial_ttfb: bool = False interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list)
[docs] @dataclass class CancelFrame(SystemFrame): """Indicates that a pipeline needs to stop right away.""" pass
[docs] @dataclass class ErrorFrame(SystemFrame): """This is used notify upstream that an error has occurred downstream the pipeline. A fatal error indicates the error is unrecoverable and that the bot should exit. """ error: str fatal: bool = False def __str__(self): return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
[docs] @dataclass class FatalErrorFrame(ErrorFrame): """This is used notify upstream that an unrecoverable error has occurred and that the bot should exit. """ fatal: bool = field(default=True, init=False)
[docs] @dataclass class EndTaskFrame(SystemFrame): """This is used to notify the pipeline task that the pipeline should be closed nicely (flushing all the queued frames) by pushing an EndFrame downstream. """ pass
[docs] @dataclass class CancelTaskFrame(SystemFrame): """This is used to notify the pipeline task that the pipeline should be stopped immediately by pushing a CancelFrame downstream. """ pass
[docs] @dataclass class StopTaskFrame(SystemFrame): """This is used to notify the pipeline task that it should be stopped as soon as possible (flushing all the queued frames) but that the pipeline processors should be kept in a running state. """ pass
[docs] @dataclass class FrameProcessorPauseUrgentFrame(SystemFrame): """This frame is used to pause frame processing for the given processor as fast as possible. Pausing frame processing will keep frames in the internal queue which will then be processed when frame processing is resumed with `FrameProcessorResumeFrame`. """ processor: "FrameProcessor"
[docs] @dataclass class FrameProcessorResumeUrgentFrame(SystemFrame): """This frame is used to resume frame processing for the given processor if it was previously paused as fast as possible. After resuming frame processing all queued frames will be processed in the order received. """ processor: "FrameProcessor"
[docs] @dataclass class StartInterruptionFrame(SystemFrame): """Emitted by VAD to indicate that a user has started speaking (i.e. is interruption). This is similar to UserStartedSpeakingFrame except that it should be pushed concurrently with other frames (so the order is not guaranteed). """ pass
[docs] @dataclass class StopInterruptionFrame(SystemFrame): """Emitted by VAD to indicate that a user has stopped speaking (i.e. no more interruptions). This is similar to UserStoppedSpeakingFrame except that it should be pushed concurrently with other frames (so the order is not guaranteed). """ pass
[docs] @dataclass class UserStartedSpeakingFrame(SystemFrame): """Emitted by VAD to indicate that a user has started speaking. This can be used for interruptions or other times when detecting that someone is speaking is more important than knowing what they're saying (as you will with a TranscriptionFrame) """ emulated: bool = False
[docs] @dataclass class UserStoppedSpeakingFrame(SystemFrame): """Emitted by the VAD to indicate that a user stopped speaking.""" emulated: bool = False
[docs] @dataclass class EmulateUserStartedSpeakingFrame(SystemFrame): """Emitted by internal processors upstream to emulate VAD behavior when a user starts speaking. """ pass
[docs] @dataclass class EmulateUserStoppedSpeakingFrame(SystemFrame): """Emitted by internal processors upstream to emulate VAD behavior when a user stops speaking. """ pass
[docs] @dataclass class VADUserStartedSpeakingFrame(SystemFrame): """Frame emitted when VAD detects the user has definitively started speaking.""" pass
[docs] @dataclass class VADUserStoppedSpeakingFrame(SystemFrame): """Frame emitted when VAD detects the user has definitively stopped speaking.""" pass
[docs] @dataclass class BotInterruptionFrame(SystemFrame): """Emitted by when the bot should be interrupted. This will mainly cause the same actions as if the user interrupted except that the UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated. """ pass
[docs] @dataclass class BotStartedSpeakingFrame(SystemFrame): """Emitted upstream by transport outputs to indicate the bot started speaking.""" pass
[docs] @dataclass class BotStoppedSpeakingFrame(SystemFrame): """Emitted upstream by transport outputs to indicate the bot stopped speaking.""" pass
[docs] @dataclass class BotSpeakingFrame(SystemFrame): """Emitted upstream by transport outputs while the bot is still speaking. This can be used, for example, to detect when a user is idle. That is, while the bot is speaking we don't want to trigger any user idle timeout since the user might be listening. """ pass
[docs] @dataclass class MetricsFrame(SystemFrame): """Emitted by processor that can compute metrics like latencies.""" data: List[MetricsData]
[docs] @dataclass class FunctionCallFromLLM: """Represents a function call returned by the LLM to be registered for execution. Attributes: function_name (str): The name of the function. tool_call_id (str): A unique identifier for the function call. arguments (Mapping[str, Any]): The arguments for the function. context (OpenAILLMContext): The LLM context. """ function_name: str tool_call_id: str arguments: Mapping[str, Any] context: Any
[docs] @dataclass class FunctionCallsStartedFrame(SystemFrame): """A frame signaling that one or more function call execution is going to start.""" function_calls: Sequence[FunctionCallFromLLM]
[docs] @dataclass class FunctionCallInProgressFrame(SystemFrame): """A frame signaling that a function call is in progress.""" function_name: str tool_call_id: str arguments: Any cancel_on_interruption: bool = False
[docs] @dataclass class FunctionCallCancelFrame(SystemFrame): """A frame to signal a function call has been cancelled.""" function_name: str tool_call_id: str
[docs] @dataclass class FunctionCallResultProperties: """Properties for a function call result frame.""" run_llm: Optional[bool] = None on_context_updated: Optional[Callable[[], Awaitable[None]]] = None
[docs] @dataclass class FunctionCallResultFrame(SystemFrame): """A frame containing the result of an LLM function (tool) call.""" function_name: str tool_call_id: str arguments: Any result: Any run_llm: Optional[bool] = None properties: Optional[FunctionCallResultProperties] = None
[docs] @dataclass class STTMuteFrame(SystemFrame): """System frame to mute/unmute the STT service.""" mute: bool
[docs] @dataclass class TransportMessageUrgentFrame(SystemFrame): message: Any def __str__(self): return f"{self.name}(message: {self.message})"
[docs] @dataclass class UserImageRequestFrame(SystemFrame): """A frame to request an image from the given user. The frame might be generated by a function call in which case the corresponding fields will be properly set. """ user_id: str context: Optional[Any] = None function_name: Optional[str] = None tool_call_id: Optional[str] = None video_source: Optional[str] = None def __str__(self): return f"{self.name}(user: {self.user_id}, video_source: {self.video_source}, function: {self.function_name}, request: {self.tool_call_id})"
[docs] @dataclass class InputAudioRawFrame(SystemFrame, AudioRawFrame): """A chunk of audio usually coming from an input transport. If the transport supports multiple audio sources (e.g. multiple audio tracks) the source name will be specified. """ def __post_init__(self): super().__post_init__() self.num_frames = int(len(self.audio) / (self.num_channels * 2)) def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class InputImageRawFrame(SystemFrame, ImageRawFrame): """An image usually coming from an input transport. If the transport supports multiple video sources (e.g. multiple video tracks) the source name will be specified. """ def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
[docs] @dataclass class UserAudioRawFrame(InputAudioRawFrame): """A chunk of audio, usually coming from an input transport, associated to a user.""" user_id: str = "" def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class UserImageRawFrame(InputImageRawFrame): """An image associated to a user.""" user_id: str = "" request: Optional[UserImageRequestFrame] = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
[docs] @dataclass class VisionImageRawFrame(InputImageRawFrame): """An image with an associated text to ask for a description of it.""" text: Optional[str] = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
[docs] @dataclass class InputDTMFFrame(DTMFFrame, SystemFrame): """A DTMF keypress input.""" pass
[docs] @dataclass class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame): """A DTMF keypress output that will be sent right away. If your transport supports multiple dial-out destinations, use the `transport_destination` field to specify where the DTMF keypress should be sent. """ pass
# # Control frames #
[docs] @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines should be shut down. If the transport receives this frame, it will stop sending frames to its output channel(s) and close all its threads. Note, that this is a control frame, which means it will received in the order it was sent (unline system frames). """ pass
[docs] @dataclass class StopFrame(ControlFrame): """Indicates that a pipeline should be stopped but that the pipeline processors should be kept in a running state. This is normally queued from the pipeline task. """ pass
[docs] @dataclass class HeartbeatFrame(ControlFrame): """This frame is used by the pipeline task as a mechanism to know if the pipeline is running properly. """ timestamp: int
[docs] @dataclass class FrameProcessorPauseFrame(ControlFrame): """This frame is used to pause frame processing for the given processor. Pausing frame processing will keep frames in the internal queue which will then be processed when frame processing is resumed with `FrameProcessorResumeFrame`. """ processor: "FrameProcessor"
[docs] @dataclass class FrameProcessorResumeFrame(ControlFrame): """This frame is used to resume frame processing for the given processor if it was previously paused. After resuming frame processing all queued frames will be processed in the order received. """ processor: "FrameProcessor"
[docs] @dataclass class LLMFullResponseStartFrame(ControlFrame): """Used to indicate the beginning of an LLM response. Following by one or more TextFrame and a final LLMFullResponseEndFrame. """ pass
[docs] @dataclass class LLMFullResponseEndFrame(ControlFrame): """Indicates the end of an LLM response.""" pass
[docs] @dataclass class TTSStartedFrame(ControlFrame): """Used to indicate the beginning of a TTS response. Following TTSAudioRawFrames are part of the TTS response until an TTSStoppedFrame. These frames can be used for aggregating audio frames in a transport to optimize the size of frames sent to the session, without needing to control this in the TTS service. """ pass
[docs] @dataclass class TTSStoppedFrame(ControlFrame): """Indicates the end of a TTS response.""" pass
[docs] @dataclass class ServiceUpdateSettingsFrame(ControlFrame): """A control frame containing a request to update service settings.""" settings: Mapping[str, Any]
[docs] @dataclass class LLMUpdateSettingsFrame(ServiceUpdateSettingsFrame): pass
[docs] @dataclass class TTSUpdateSettingsFrame(ServiceUpdateSettingsFrame): pass
[docs] @dataclass class STTUpdateSettingsFrame(ServiceUpdateSettingsFrame): pass
[docs] @dataclass class VADParamsUpdateFrame(ControlFrame): """A control frame containing a request to update VAD params. Intended to be pushed upstream from RTVI processor. """ params: VADParams
[docs] @dataclass class FilterControlFrame(ControlFrame): """Base control frame for other audio filter frames.""" pass
[docs] @dataclass class FilterUpdateSettingsFrame(FilterControlFrame): """Control frame to update filter settings.""" settings: Mapping[str, Any]
[docs] @dataclass class FilterEnableFrame(FilterControlFrame): """Control frame to enable or disable the filter at runtime.""" enable: bool
[docs] @dataclass class MixerControlFrame(ControlFrame): """Base control frame for other audio mixer frames.""" pass
[docs] @dataclass class MixerUpdateSettingsFrame(MixerControlFrame): """Control frame to update mixer settings.""" settings: Mapping[str, Any]
[docs] @dataclass class MixerEnableFrame(MixerControlFrame): """Control frame to enable or disable the mixer at runtime.""" enable: bool