Source code for pipecat.services.openai_realtime_beta.azure

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from loguru import logger

from .openai import OpenAIRealtimeBetaLLMService

try:
    import websockets
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error(
        "In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
    )
    raise Exception(f"Missing module: {e}")


[docs] class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService): """Subclass of OpenAI Realtime API Service with adjustments for Azure's wss connection.""" def __init__( self, *, api_key: str, base_url: str, **kwargs, ): """Constructor takes the same arguments as the parent class, OpenAIRealtimeBetaLLMService. Note that the following are required arguments: api_key: The API key for the Azure OpenAI service. base_url: The base URL for the Azure OpenAI service. base_url should be set to the full Azure endpoint URL including the api-version and the deployment name. For example, wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment """ super().__init__(base_url=base_url, api_key=api_key, **kwargs) self.api_key = api_key self.base_url = base_url async def _connect(self): try: if self._websocket: # Here we assume that if we have a websocket, we are connected. We # handle disconnections in the send/recv code paths. return logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}") self._websocket = await websockets.connect( uri=self.base_url, extra_headers={ "api-key": self.api_key, }, ) self._receive_task = self.create_task(self._receive_task_handler()) except Exception as e: logger.error(f"{self} initialization error: {e}") self._websocket = None