Source code for pipecat.services.deepgram.stt

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Deepgram speech-to-text service implementation."""

from typing import AsyncGenerator, Dict, Optional

from loguru import logger

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    InterimTranscriptionFrame,
    StartFrame,
    TranscriptionFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt

try:
    from deepgram import (
        AsyncListenWebSocketClient,
        DeepgramClient,
        DeepgramClientOptions,
        ErrorResponse,
        LiveOptions,
        LiveResultResponse,
        LiveTranscriptionEvents,
    )
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`.")
    raise Exception(f"Missing module: {e}")


[docs] class DeepgramSTTService(STTService): """Deepgram speech-to-text service. Provides real-time speech recognition using Deepgram's WebSocket API. Supports configurable models, languages, VAD events, and various audio processing options. Args: api_key: Deepgram API key for authentication. url: Deprecated. Use base_url instead. base_url: Custom Deepgram API base URL. sample_rate: Audio sample rate. If None, uses default or live_options value. live_options: Deepgram LiveOptions for detailed configuration. addons: Additional Deepgram features to enable. **kwargs: Additional arguments passed to the parent STTService. """ def __init__( self, *, api_key: str, url: str = "", base_url: str = "", sample_rate: Optional[int] = None, live_options: Optional[LiveOptions] = None, addons: Optional[Dict] = None, **kwargs, ): sample_rate = sample_rate or (live_options.sample_rate if live_options else None) super().__init__(sample_rate=sample_rate, **kwargs) if url: import warnings with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "Parameter 'url' is deprecated, use 'base_url' instead.", DeprecationWarning, ) base_url = url default_options = LiveOptions( encoding="linear16", language=Language.EN, model="nova-3-general", channels=1, interim_results=True, smart_format=True, punctuate=True, profanity_filter=True, vad_events=False, ) merged_options = default_options.to_dict() if live_options: default_model = default_options.model merged_options.update(live_options.to_dict()) # NOTE(aleix): Fixes an in deepgram-sdk where `model` is initialized # to the string "None" instead of the value `None`. if "model" in merged_options and merged_options["model"] == "None": merged_options["model"] = default_model if "language" in merged_options and isinstance(merged_options["language"], Language): merged_options["language"] = merged_options["language"].value self.set_model_name(merged_options["model"]) self._settings = merged_options self._addons = addons self._client = DeepgramClient( api_key, config=DeepgramClientOptions( url=base_url, options={"keepalive": "true"}, # verbose=logging.DEBUG ), ) if self.vad_enabled: self._register_event_handler("on_speech_started") self._register_event_handler("on_utterance_end") @property def vad_enabled(self): """Check if Deepgram VAD events are enabled. Returns: True if VAD events are enabled in the current settings. """ return self._settings["vad_events"]
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as Deepgram service supports metrics generation. """ return True
[docs] async def set_model(self, model: str): """Set the Deepgram model and reconnect. Args: model: The Deepgram model name to use. """ await super().set_model(model) logger.info(f"Switching STT model to: [{model}]") self._settings["model"] = model await self._disconnect() await self._connect()
[docs] async def set_language(self, language: Language): """Set the recognition language and reconnect. Args: language: The language to use for speech recognition. """ logger.info(f"Switching STT language to: [{language}]") self._settings["language"] = language await self._disconnect() await self._connect()
[docs] async def start(self, frame: StartFrame): """Start the Deepgram STT service. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) self._settings["sample_rate"] = self.sample_rate await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the Deepgram STT service. Args: frame: The end frame. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the Deepgram STT service. Args: frame: The cancel frame. """ await super().cancel(frame) await self._disconnect()
[docs] async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: """Send audio data to Deepgram for transcription. Args: audio: Raw audio bytes to transcribe. Yields: Frame: None (transcription results come via WebSocket callbacks). """ await self._connection.send(audio) yield None
async def _connect(self): logger.debug("Connecting to Deepgram") self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1") self._connection.on( LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message ) self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error) if self.vad_enabled: self._connection.on( LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted), self._on_speech_started, ) self._connection.on( LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd), self._on_utterance_end, ) if not await self._connection.start(options=self._settings, addons=self._addons): logger.error(f"{self}: unable to connect to Deepgram") async def _disconnect(self): if self._connection.is_connected: logger.debug("Disconnecting from Deepgram") await self._connection.finish()
[docs] async def start_metrics(self): """Start TTFB and processing metrics collection.""" await self.start_ttfb_metrics() await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs): error: ErrorResponse = kwargs["error"] logger.warning(f"{self} connection error, will retry: {error}") await self.stop_all_metrics() # NOTE(aleix): we don't disconnect (i.e. call finish on the connection) # because this triggers more errors internally in the Deepgram SDK. So, # we just forget about the previous connection and create a new one. await self._connect() async def _on_speech_started(self, *args, **kwargs): await self.start_metrics() await self._call_event_handler("on_speech_started", *args, **kwargs) async def _on_utterance_end(self, *args, **kwargs): await self._call_event_handler("on_utterance_end", *args, **kwargs) @traced_stt async def _handle_transcription( self, transcript: str, is_final: bool, language: Optional[Language] = None ): """Handle a transcription result with tracing.""" pass async def _on_message(self, *args, **kwargs): result: LiveResultResponse = kwargs["result"] if len(result.channel.alternatives) == 0: return is_final = result.is_final transcript = result.channel.alternatives[0].transcript language = None if result.channel.alternatives[0].languages: language = result.channel.alternatives[0].languages[0] language = Language(language) if len(transcript) > 0: await self.stop_ttfb_metrics() if is_final: await self.push_frame( TranscriptionFrame( transcript, "", time_now_iso8601(), language, result=result, ) ) await self._handle_transcription(transcript, is_final, language) await self.stop_processing_metrics() else: # For interim transcriptions, just push the frame without tracing await self.push_frame( InterimTranscriptionFrame( transcript, "", time_now_iso8601(), language, result=result, ) )
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames with Deepgram-specific handling. Args: frame: The frame to process. direction: The direction of frame processing. """ await super().process_frame(frame, direction) if isinstance(frame, UserStartedSpeakingFrame) and not self.vad_enabled: # Start metrics if Deepgram VAD is disabled & pipeline VAD has detected speech await self.start_metrics() elif isinstance(frame, UserStoppedSpeakingFrame): # https://developers.deepgram.com/docs/finalize await self._connection.finalize() logger.trace(f"Triggered finalize event on: {frame.name=}, {direction=}")