Source code for pipecat.processors.frameworks.rtvi

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import base64
from dataclasses import dataclass
from typing import (
    Any,
    Awaitable,
    Callable,
    Dict,
    List,
    Literal,
    Mapping,
    Optional,
    Union,
)

from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError

from pipecat.frames.frames import (
    BotInterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    CancelFrame,
    DataFrame,
    EndFrame,
    EndTaskFrame,
    ErrorFrame,
    Frame,
    FunctionCallResultFrame,
    InputAudioRawFrame,
    InterimTranscriptionFrame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMTextFrame,
    MetricsFrame,
    StartFrame,
    SystemFrame,
    TranscriptionFrame,
    TransportMessageUrgentFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
    TTSTextFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import (
    LLMUsageMetricsData,
    ProcessingMetricsData,
    TTFBMetricsData,
    TTSUsageMetricsData,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.openai_llm_context import (
    OpenAILLMContext,
    OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import (
    FunctionCallParams,  # TODO(aleix): we shouldn't import `services` from `processors`
)
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport
from pipecat.utils.string import match_endofsentence

RTVI_PROTOCOL_VERSION = "0.3.0"

RTVI_MESSAGE_LABEL = "rtvi-ai"
RTVIMessageLiteral = Literal["rtvi-ai"]

ActionResult = Union[bool, int, float, str, list, dict]


[docs] class RTVIServiceOption(BaseModel): name: str type: Literal["bool", "number", "string", "array", "object"] handler: Callable[["RTVIProcessor", str, "RTVIServiceOptionConfig"], Awaitable[None]] = Field( exclude=True )
[docs] class RTVIService(BaseModel): name: str options: List[RTVIServiceOption] _options_dict: Dict[str, RTVIServiceOption] = PrivateAttr(default={}) def model_post_init(self, __context: Any) -> None: self._options_dict = {} for option in self.options: self._options_dict[option.name] = option return super().model_post_init(__context)
[docs] class RTVIActionArgumentData(BaseModel): name: str value: Any
[docs] class RTVIActionArgument(BaseModel): name: str type: Literal["bool", "number", "string", "array", "object"]
[docs] class RTVIAction(BaseModel): service: str action: str arguments: List[RTVIActionArgument] = Field(default_factory=list) result: Literal["bool", "number", "string", "array", "object"] handler: Callable[["RTVIProcessor", str, Dict[str, Any]], Awaitable[ActionResult]] = Field( exclude=True ) _arguments_dict: Dict[str, RTVIActionArgument] = PrivateAttr(default={}) def model_post_init(self, __context: Any) -> None: self._arguments_dict = {} for arg in self.arguments: self._arguments_dict[arg.name] = arg return super().model_post_init(__context)
[docs] class RTVIServiceOptionConfig(BaseModel): name: str value: Any
[docs] class RTVIServiceConfig(BaseModel): service: str options: List[RTVIServiceOptionConfig]
[docs] class RTVIConfig(BaseModel): config: List[RTVIServiceConfig]
# # Client -> Pipecat messages. #
[docs] class RTVIUpdateConfig(BaseModel): config: List[RTVIServiceConfig] interrupt: bool = False
[docs] class RTVIActionRunArgument(BaseModel): name: str value: Any
[docs] class RTVIActionRun(BaseModel): service: str action: str arguments: Optional[List[RTVIActionRunArgument]] = None
[docs] @dataclass class RTVIActionFrame(DataFrame): rtvi_action_run: RTVIActionRun message_id: Optional[str] = None
[docs] class RTVIMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: str id: str data: Optional[Dict[str, Any]] = None
# # Pipecat -> Client responses and messages. #
[docs] class RTVIErrorResponseData(BaseModel): error: str
[docs] class RTVIErrorResponse(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["error-response"] = "error-response" id: str data: RTVIErrorResponseData
[docs] class RTVIErrorData(BaseModel): error: str fatal: bool
[docs] class RTVIError(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["error"] = "error" data: RTVIErrorData
[docs] class RTVIDescribeConfigData(BaseModel): config: List[RTVIService]
[docs] class RTVIDescribeConfig(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["config-available"] = "config-available" id: str data: RTVIDescribeConfigData
[docs] class RTVIDescribeActionsData(BaseModel): actions: List[RTVIAction]
[docs] class RTVIDescribeActions(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["actions-available"] = "actions-available" id: str data: RTVIDescribeActionsData
[docs] class RTVIConfigResponse(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["config"] = "config" id: str data: RTVIConfig
[docs] class RTVIActionResponseData(BaseModel): result: ActionResult
[docs] class RTVIActionResponse(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["action-response"] = "action-response" id: str data: RTVIActionResponseData
[docs] class RTVIBotReadyData(BaseModel): version: str config: List[RTVIServiceConfig]
[docs] class RTVIBotReady(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-ready"] = "bot-ready" id: str data: RTVIBotReadyData
[docs] class RTVILLMFunctionCallMessageData(BaseModel): function_name: str tool_call_id: str args: Mapping[str, Any]
[docs] class RTVILLMFunctionCallMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["llm-function-call"] = "llm-function-call" data: RTVILLMFunctionCallMessageData
[docs] class RTVILLMFunctionCallStartMessageData(BaseModel): function_name: str
[docs] class RTVILLMFunctionCallStartMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["llm-function-call-start"] = "llm-function-call-start" data: RTVILLMFunctionCallStartMessageData
[docs] class RTVILLMFunctionCallResultData(BaseModel): function_name: str tool_call_id: str arguments: dict result: dict | str
[docs] class RTVIBotLLMStartedMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-llm-started"] = "bot-llm-started"
[docs] class RTVIBotLLMStoppedMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-llm-stopped"] = "bot-llm-stopped"
[docs] class RTVIBotTTSStartedMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-tts-started"] = "bot-tts-started"
[docs] class RTVIBotTTSStoppedMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-tts-stopped"] = "bot-tts-stopped"
[docs] class RTVITextMessageData(BaseModel): text: str
[docs] class RTVIBotTranscriptionMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-transcription"] = "bot-transcription" data: RTVITextMessageData
[docs] class RTVIBotLLMTextMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-llm-text"] = "bot-llm-text" data: RTVITextMessageData
[docs] class RTVIBotTTSTextMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-tts-text"] = "bot-tts-text" data: RTVITextMessageData
[docs] class RTVIAudioMessageData(BaseModel): audio: str sample_rate: int num_channels: int
[docs] class RTVIBotTTSAudioMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-tts-audio"] = "bot-tts-audio" data: RTVIAudioMessageData
[docs] class RTVIUserTranscriptionMessageData(BaseModel): text: str user_id: str timestamp: str final: bool
[docs] class RTVIUserTranscriptionMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["user-transcription"] = "user-transcription" data: RTVIUserTranscriptionMessageData
[docs] class RTVIUserLLMTextMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["user-llm-text"] = "user-llm-text" data: RTVITextMessageData
[docs] class RTVIUserStartedSpeakingMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["user-started-speaking"] = "user-started-speaking"
[docs] class RTVIUserStoppedSpeakingMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["user-stopped-speaking"] = "user-stopped-speaking"
[docs] class RTVIBotStartedSpeakingMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-started-speaking"] = "bot-started-speaking"
[docs] class RTVIBotStoppedSpeakingMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["bot-stopped-speaking"] = "bot-stopped-speaking"
[docs] class RTVIMetricsMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["metrics"] = "metrics" data: Mapping[str, Any]
[docs] class RTVIServerMessage(BaseModel): label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL type: Literal["server-message"] = "server-message" data: Any
[docs] @dataclass class RTVIServerMessageFrame(SystemFrame): """A frame for sending server messages to the client.""" data: Any def __str__(self): return f"{self.name}(data: {self.data})"
[docs] @dataclass class RTVIObserverParams: """ Parameters for configuring RTVI Observer behavior. Attributes: bot_llm_enabled (bool): Indicates if the bot's LLM messages should be sent. bot_tts_enabled (bool): Indicates if the bot's TTS messages should be sent. bot_speaking_enabled (bool): Indicates if the bot's started/stopped speaking messages should be sent. user_llm_enabled (bool): Indicates if the user's LLM input messages should be sent. user_speaking_enabled (bool): Indicates if the user's started/stopped speaking messages should be sent. user_transcription_enabled (bool): Indicates if user's transcription messages should be sent. metrics_enabled (bool): Indicates if metrics messages should be sent. errors_enabled (bool): Indicates if errors messages should be sent. """ bot_llm_enabled: bool = True bot_tts_enabled: bool = True bot_speaking_enabled: bool = True user_llm_enabled: bool = True user_speaking_enabled: bool = True user_transcription_enabled: bool = True metrics_enabled: bool = True errors_enabled: bool = True
[docs] class RTVIObserver(BaseObserver): """Pipeline frame observer for RTVI server message handling. This observer monitors pipeline frames and converts them into appropriate RTVI messages for client communication. It handles various frame types including speech events, transcriptions, LLM responses, and TTS events. Note: This observer only handles outgoing messages. Incoming RTVI client messages are handled by the RTVIProcessor. Args: rtvi (RTVIProcessor): The RTVI processor to push frames to. params (RTVIObserverParams): Settings to enable/disable specific messages. """ def __init__( self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs ): super().__init__(**kwargs) self._rtvi = rtvi self._params = params or RTVIObserverParams() self._bot_transcription = "" self._frames_seen = set() rtvi.set_errors_enabled(self._params.errors_enabled)
[docs] async def on_push_frame(self, data: FramePushed): """Process a frame being pushed through the pipeline. Args: src: Source processor pushing the frame dst: Destination processor receiving the frame frame: The frame being pushed direction: Direction of frame flow in pipeline timestamp: Time when frame was pushed """ src = data.source frame = data.frame direction = data.direction # If we have already seen this frame, let's skip it. if frame.id in self._frames_seen: return # This tells whether the frame is already processed. If false, we will try # again the next time we see the frame. mark_as_seen = True if ( isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)) and self._params.user_speaking_enabled ): await self._handle_interruptions(frame) elif ( isinstance(frame, (BotStartedSpeakingFrame, BotStoppedSpeakingFrame)) and (direction == FrameDirection.UPSTREAM) and self._params.bot_speaking_enabled ): await self._handle_bot_speaking(frame) elif ( isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)) and self._params.user_transcription_enabled ): await self._handle_user_transcriptions(frame) elif isinstance(frame, OpenAILLMContextFrame) and self._params.user_llm_enabled: await self._handle_context(frame) elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled: await self.push_transport_message_urgent(RTVIBotLLMStartedMessage()) elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled: await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage()) elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled: await self._handle_llm_text_frame(frame) elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled: await self.push_transport_message_urgent(RTVIBotTTSStartedMessage()) elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled: await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage()) elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled: if isinstance(src, BaseOutputTransport): message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text)) await self.push_transport_message_urgent(message) else: mark_as_seen = False elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled: await self._handle_metrics(frame) elif isinstance(frame, RTVIServerMessageFrame): message = RTVIServerMessage(data=frame.data) await self.push_transport_message_urgent(message) if mark_as_seen: self._frames_seen.add(frame.id)
[docs] async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True): """Push an urgent transport message to the RTVI processor. Args: model: The message model to send exclude_none: Whether to exclude None values from the model dump """ frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none)) await self._rtvi.push_frame(frame)
async def _push_bot_transcription(self): if len(self._bot_transcription) > 0: message = RTVIBotTranscriptionMessage( data=RTVITextMessageData(text=self._bot_transcription) ) await self.push_transport_message_urgent(message) self._bot_transcription = "" async def _handle_interruptions(self, frame: Frame): message = None if isinstance(frame, UserStartedSpeakingFrame): message = RTVIUserStartedSpeakingMessage() elif isinstance(frame, UserStoppedSpeakingFrame): message = RTVIUserStoppedSpeakingMessage() if message: await self.push_transport_message_urgent(message) async def _handle_bot_speaking(self, frame: Frame): message = None if isinstance(frame, BotStartedSpeakingFrame): message = RTVIBotStartedSpeakingMessage() elif isinstance(frame, BotStoppedSpeakingFrame): message = RTVIBotStoppedSpeakingMessage() if message: await self.push_transport_message_urgent(message) async def _handle_llm_text_frame(self, frame: LLMTextFrame): message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text)) await self.push_transport_message_urgent(message) self._bot_transcription += frame.text if match_endofsentence(self._bot_transcription): await self._push_bot_transcription() async def _handle_user_transcriptions(self, frame: Frame): message = None if isinstance(frame, TranscriptionFrame): message = RTVIUserTranscriptionMessage( data=RTVIUserTranscriptionMessageData( text=frame.text, user_id=frame.user_id, timestamp=frame.timestamp, final=True ) ) elif isinstance(frame, InterimTranscriptionFrame): message = RTVIUserTranscriptionMessage( data=RTVIUserTranscriptionMessageData( text=frame.text, user_id=frame.user_id, timestamp=frame.timestamp, final=False ) ) if message: await self.push_transport_message_urgent(message) async def _handle_context(self, frame: OpenAILLMContextFrame): """Process LLM context frames to extract user messages for the RTVI client.""" try: messages = frame.context.messages if not messages: return message = messages[-1] # Handle Google LLM format (protobuf objects with attributes) if hasattr(message, "role") and message.role == "user" and hasattr(message, "parts"): text = "".join(part.text for part in message.parts if hasattr(part, "text")) if text: rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) await self.push_transport_message_urgent(rtvi_message) # Handle OpenAI format (original implementation) elif isinstance(message, dict): if message["role"] == "user": content = message["content"] if isinstance(content, list): text = " ".join(item["text"] for item in content if "text" in item) else: text = content rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) await self.push_transport_message_urgent(rtvi_message) except Exception as e: logger.warning(f"Caught an error while trying to handle context: {e}") async def _handle_metrics(self, frame: MetricsFrame): metrics = {} for d in frame.data: if isinstance(d, TTFBMetricsData): if "ttfb" not in metrics: metrics["ttfb"] = [] metrics["ttfb"].append(d.model_dump(exclude_none=True)) elif isinstance(d, ProcessingMetricsData): if "processing" not in metrics: metrics["processing"] = [] metrics["processing"].append(d.model_dump(exclude_none=True)) elif isinstance(d, LLMUsageMetricsData): if "tokens" not in metrics: metrics["tokens"] = [] metrics["tokens"].append(d.value.model_dump(exclude_none=True)) elif isinstance(d, TTSUsageMetricsData): if "characters" not in metrics: metrics["characters"] = [] metrics["characters"].append(d.model_dump(exclude_none=True)) message = RTVIMetricsMessage(data=metrics) await self.push_transport_message_urgent(message)
[docs] class RTVIProcessor(FrameProcessor): def __init__( self, *, config: Optional[RTVIConfig] = None, transport: Optional[BaseTransport] = None, **kwargs, ): super().__init__(**kwargs) self._config = config or RTVIConfig(config=[]) self._bot_ready = False self._client_ready = False self._client_ready_id = "" self._errors_enabled = True self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} # A task to process incoming action frames. self._action_queue = asyncio.Queue() self._action_task: Optional[asyncio.Task] = None # A task to process incoming transport messages. self._message_queue = asyncio.Queue() self._message_task: Optional[asyncio.Task] = None self._register_event_handler("on_bot_started") self._register_event_handler("on_client_ready") self._input_transport = None self._transport = transport if self._transport: input_transport = self._transport.input() if isinstance(input_transport, BaseInputTransport): self._input_transport = input_transport self._input_transport.enable_audio_in_stream_on_start(False)
[docs] def register_action(self, action: RTVIAction): id = self._action_id(action.service, action.action) self._registered_actions[id] = action
[docs] def register_service(self, service: RTVIService): self._registered_services[service.name] = service
[docs] async def set_client_ready(self): self._client_ready = True await self._call_event_handler("on_client_ready")
[docs] async def set_bot_ready(self): self._bot_ready = True await self._update_config(self._config, False) await self._send_bot_ready()
[docs] def set_errors_enabled(self, enabled: bool): self._errors_enabled = enabled
[docs] async def interrupt_bot(self): await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
[docs] async def send_error(self, error: str): await self._send_error_frame(ErrorFrame(error=error))
[docs] async def handle_message(self, message: RTVIMessage): await self._message_queue.put(message)
[docs] async def handle_function_call(self, params: FunctionCallParams): fn = RTVILLMFunctionCallMessageData( function_name=params.function_name, tool_call_id=params.tool_call_id, args=params.arguments, ) message = RTVILLMFunctionCallMessage(data=fn) await self._push_transport_message(message, exclude_none=False)
[docs] async def handle_function_call_start( self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext ): import warnings with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "Function `RTVIProcessor.handle_function_call_start()` is deprecated, use `RTVIProcessor.handle_function_call()` instead.", DeprecationWarning, ) fn = RTVILLMFunctionCallStartMessageData(function_name=function_name) message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False)
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, ErrorFrame): await self._send_error_frame(frame) await self.push_frame(frame, direction) elif isinstance(frame, TransportMessageUrgentFrame): await self._handle_transport_message(frame) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. await self.push_frame(frame, direction) await self._stop(frame) # Data frames elif isinstance(frame, RTVIActionFrame): await self._action_queue.put(frame) # Other frames else: await self.push_frame(frame, direction)
async def _start(self, frame: StartFrame): if not self._action_task: self._action_task = self.create_task(self._action_task_handler()) if not self._message_task: self._message_task = self.create_task(self._message_task_handler()) await self._call_event_handler("on_bot_started") async def _stop(self, frame: EndFrame): await self._cancel_tasks() async def _cancel(self, frame: CancelFrame): await self._cancel_tasks() async def _cancel_tasks(self): if self._action_task: await self.cancel_task(self._action_task) self._action_task = None if self._message_task: await self.cancel_task(self._message_task) self._message_task = None async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none)) await self.push_frame(frame) async def _action_task_handler(self): while True: frame = await self._action_queue.get() self.start_watchdog() await self._handle_action(frame.message_id, frame.rtvi_action_run) self._action_queue.task_done() self.reset_watchdog() async def _message_task_handler(self): while True: message = await self._message_queue.get() self.start_watchdog() await self._handle_message(message) self._message_queue.task_done() self.reset_watchdog() async def _handle_transport_message(self, frame: TransportMessageUrgentFrame): try: transport_message = frame.message if transport_message.get("label") != RTVI_MESSAGE_LABEL: logger.warning(f"Ignoring not RTVI message: {transport_message}") return message = RTVIMessage.model_validate(transport_message) await self._message_queue.put(message) except ValidationError as e: await self.send_error(f"Invalid RTVI transport message: {e}") logger.warning(f"Invalid RTVI transport message: {e}") async def _handle_message(self, message: RTVIMessage): try: match message.type: case "client-ready": await self._handle_client_ready(message.id) case "describe-actions": await self._handle_describe_actions(message.id) case "describe-config": await self._handle_describe_config(message.id) case "get-config": await self._handle_get_config(message.id) case "update-config": update_config = RTVIUpdateConfig.model_validate(message.data) await self._handle_update_config(message.id, update_config) case "disconnect-bot": await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) case "action": action = RTVIActionRun.model_validate(message.data) action_frame = RTVIActionFrame(message_id=message.id, rtvi_action_run=action) await self._action_queue.put(action_frame) case "llm-function-call-result": data = RTVILLMFunctionCallResultData.model_validate(message.data) await self._handle_function_call_result(data) case "raw-audio" | "raw-audio-batch": await self._handle_audio_buffer(message.data) case _: await self._send_error_response(message.id, f"Unsupported type {message.type}") except ValidationError as e: await self._send_error_response(message.id, f"Invalid message: {e}") logger.warning(f"Invalid message: {e}") except Exception as e: await self._send_error_response(message.id, f"Exception processing message: {e}") logger.warning(f"Exception processing message: {e}") async def _handle_client_ready(self, request_id: str): logger.debug("Received client-ready") if self._input_transport: await self._input_transport.start_audio_in_streaming() self._client_ready_id = request_id await self.set_client_ready() async def _handle_audio_buffer(self, data): if not self._input_transport: return # Extract audio batch ensuring it's a list audio_list = data.get("base64AudioBatch") or [data.get("base64Audio")] try: for base64_audio in filter(None, audio_list): # Filter out None values pcm_bytes = base64.b64decode(base64_audio) frame = InputAudioRawFrame( audio=pcm_bytes, sample_rate=data["sampleRate"], num_channels=data["numChannels"], ) await self._input_transport.push_audio_frame(frame) except (KeyError, TypeError, ValueError) as e: # Handle missing keys, decoding errors, and invalid types logger.error(f"Error processing audio buffer: {e}") async def _handle_describe_config(self, request_id: str): services = list(self._registered_services.values()) message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services)) await self._push_transport_message(message) async def _handle_describe_actions(self, request_id: str): actions = list(self._registered_actions.values()) message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions)) await self._push_transport_message(message) async def _handle_get_config(self, request_id: str): message = RTVIConfigResponse(id=request_id, data=self._config) await self._push_transport_message(message) def _update_config_option(self, service: str, config: RTVIServiceOptionConfig): for service_config in self._config.config: if service_config.service == service: for option_config in service_config.options: if option_config.name == config.name: option_config.value = config.value return # If we couldn't find a value for this config, we simply need to # add it. service_config.options.append(config) async def _update_service_config(self, config: RTVIServiceConfig): service = self._registered_services[config.service] for option in config.options: handler = service._options_dict[option.name].handler await handler(self, service.name, option) self._update_config_option(service.name, option) async def _update_config(self, data: RTVIConfig, interrupt: bool): if interrupt: await self.interrupt_bot() for service_config in data.config: await self._update_service_config(service_config) async def _handle_update_config(self, request_id: str, data: RTVIUpdateConfig): await self._update_config(RTVIConfig(config=data.config), data.interrupt) await self._handle_get_config(request_id) async def _handle_function_call_result(self, data): frame = FunctionCallResultFrame( function_name=data.function_name, tool_call_id=data.tool_call_id, arguments=data.arguments, result=data.result, ) await self.push_frame(frame) async def _handle_action(self, request_id: Optional[str], data: RTVIActionRun): action_id = self._action_id(data.service, data.action) if action_id not in self._registered_actions: await self._send_error_response(request_id, f"Action {action_id} not registered") return action = self._registered_actions[action_id] arguments = {} if data.arguments: for arg in data.arguments: arguments[arg.name] = arg.value result = await action.handler(self, action.service, arguments) # Only send a response if request_id is present. Things that don't care about # action responses (such as webhooks) don't set a request_id if request_id: message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result)) await self._push_transport_message(message) async def _send_bot_ready(self): message = RTVIBotReady( id=self._client_ready_id, data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=self._config.config), ) await self._push_transport_message(message) async def _send_error_frame(self, frame: ErrorFrame): if self._errors_enabled: message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal)) await self._push_transport_message(message) async def _send_error_response(self, id: str, error: str): if self._errors_enabled: message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error)) await self._push_transport_message(message) def _action_id(self, service: str, action: str) -> str: return f"{service}:{action}"