#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List, Literal, Optional
from pydantic import BaseModel
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
[docs]
class RTVISearchResponseMessageData(BaseModel):
search_result: Optional[str]
rendered_content: Optional[str]
origins: List[LLMSearchOrigin]
[docs]
class RTVIBotLLMSearchResponseMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
type: Literal["bot-llm-search-response"] = "bot-llm-search-response"
data: RTVISearchResponseMessageData
[docs]
class GoogleRTVIObserver(RTVIObserver):
def __init__(self, rtvi: RTVIProcessor):
super().__init__(rtvi)
[docs]
async def on_push_frame(self, data: FramePushed):
await super().on_push_frame(data)
frame = data.frame
if isinstance(frame, LLMSearchResponseFrame):
await self._handle_llm_search_response_frame(frame)
async def _handle_llm_search_response_frame(self, frame: LLMSearchResponseFrame):
message = RTVIBotLLMSearchResponseMessage(
data=RTVISearchResponseMessageData(
search_result=frame.search_result,
origins=frame.origins,
rendered_content=frame.rendered_content,
)
)
await self.push_transport_message_urgent(message)