Source code for pipecat.services.azure.stt

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
from typing import AsyncGenerator, Optional

from loguru import logger

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    StartFrame,
    TranscriptionFrame,
)
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt

try:
    from azure.cognitiveservices.speech import (
        ResultReason,
        SpeechConfig,
        SpeechRecognizer,
    )
    from azure.cognitiveservices.speech.audio import (
        AudioStreamFormat,
        PushAudioInputStream,
    )
    from azure.cognitiveservices.speech.dialog import AudioConfig
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
    raise Exception(f"Missing module: {e}")


[docs] class AzureSTTService(STTService): def __init__( self, *, api_key: str, region: str, language: Language = Language.EN_US, sample_rate: Optional[int] = None, **kwargs, ): super().__init__(sample_rate=sample_rate, **kwargs) self._speech_config = SpeechConfig( subscription=api_key, region=region, speech_recognition_language=language_to_azure_language(language), ) self._audio_stream = None self._speech_recognizer = None self._settings = { "region": region, "language": language_to_azure_language(language), "sample_rate": sample_rate, }
[docs] def can_generate_metrics(self) -> bool: return True
[docs] async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self.start_ttfb_metrics() if self._audio_stream: self._audio_stream.write(audio) yield None
[docs] async def start(self, frame: StartFrame): await super().start(frame) if self._audio_stream: return stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) self._audio_stream = PushAudioInputStream(stream_format) audio_config = AudioConfig(stream=self._audio_stream) self._speech_recognizer = SpeechRecognizer( speech_config=self._speech_config, audio_config=audio_config ) self._speech_recognizer.recognized.connect(self._on_handle_recognized) self._speech_recognizer.start_continuous_recognition_async()
[docs] async def stop(self, frame: EndFrame): await super().stop(frame) if self._speech_recognizer: self._speech_recognizer.stop_continuous_recognition_async() if self._audio_stream: self._audio_stream.close()
[docs] async def cancel(self, frame: CancelFrame): await super().cancel(frame) if self._speech_recognizer: self._speech_recognizer.stop_continuous_recognition_async() if self._audio_stream: self._audio_stream.close()
@traced_stt async def _handle_transcription( self, transcript: str, is_final: bool, language: Optional[Language] = None ): """Handle a transcription result with tracing.""" await self.stop_ttfb_metrics() await self.stop_processing_metrics() def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: language = getattr(event.result, "language", None) or self._settings.get("language") frame = TranscriptionFrame( event.result.text, "", time_now_iso8601(), language, result=event, ) asyncio.run_coroutine_threadsafe( self._handle_transcription(event.result.text, True, language), self.get_event_loop() ) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())