#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram speech-to-text service implementation."""
from typing import AsyncGenerator, Dict, Optional
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from deepgram import (
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
ErrorResponse,
LiveOptions,
LiveResultResponse,
LiveTranscriptionEvents,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
[docs]
class DeepgramSTTService(STTService):
"""Deepgram speech-to-text service.
Provides real-time speech recognition using Deepgram's WebSocket API.
Supports configurable models, languages, VAD events, and various audio
processing options.
Args:
api_key: Deepgram API key for authentication.
url: Deprecated. Use base_url instead.
base_url: Custom Deepgram API base URL.
sample_rate: Audio sample rate. If None, uses default or live_options value.
live_options: Deepgram LiveOptions for detailed configuration.
addons: Additional Deepgram features to enable.
**kwargs: Additional arguments passed to the parent STTService.
"""
def __init__(
self,
*,
api_key: str,
url: str = "",
base_url: str = "",
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
addons: Optional[Dict] = None,
**kwargs,
):
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
super().__init__(sample_rate=sample_rate, **kwargs)
if url:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter 'url' is deprecated, use 'base_url' instead.",
DeprecationWarning,
)
base_url = url
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-3-general",
channels=1,
interim_results=True,
smart_format=True,
punctuate=True,
profanity_filter=True,
vad_events=False,
)
merged_options = default_options.to_dict()
if live_options:
default_model = default_options.model
merged_options.update(live_options.to_dict())
# NOTE(aleix): Fixes an in deepgram-sdk where `model` is initialized
# to the string "None" instead of the value `None`.
if "model" in merged_options and merged_options["model"] == "None":
merged_options["model"] = default_model
if "language" in merged_options and isinstance(merged_options["language"], Language):
merged_options["language"] = merged_options["language"].value
self.set_model_name(merged_options["model"])
self._settings = merged_options
self._addons = addons
self._client = DeepgramClient(
api_key,
config=DeepgramClientOptions(
url=base_url,
options={"keepalive": "true"}, # verbose=logging.DEBUG
),
)
if self.vad_enabled:
self._register_event_handler("on_speech_started")
self._register_event_handler("on_utterance_end")
@property
def vad_enabled(self):
"""Check if Deepgram VAD events are enabled.
Returns:
True if VAD events are enabled in the current settings.
"""
return self._settings["vad_events"]
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Deepgram service supports metrics generation.
"""
return True
[docs]
async def set_model(self, model: str):
"""Set the Deepgram model and reconnect.
Args:
model: The Deepgram model name to use.
"""
await super().set_model(model)
logger.info(f"Switching STT model to: [{model}]")
self._settings["model"] = model
await self._disconnect()
await self._connect()
[docs]
async def set_language(self, language: Language):
"""Set the recognition language and reconnect.
Args:
language: The language to use for speech recognition.
"""
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
[docs]
async def start(self, frame: StartFrame):
"""Start the Deepgram STT service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Deepgram STT service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram STT service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram for transcription.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: None (transcription results come via WebSocket callbacks).
"""
await self._connection.send(audio)
yield None
async def _connect(self):
logger.debug("Connecting to Deepgram")
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
)
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
self._on_speech_started,
)
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
self._on_utterance_end,
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
async def _disconnect(self):
if self._connection.is_connected:
logger.debug("Disconnecting from Deepgram")
await self._connection.finish()
[docs]
async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,
# we just forget about the previous connection and create a new one.
await self._connect()
async def _on_speech_started(self, *args, **kwargs):
await self.start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)
async def _on_utterance_end(self, *args, **kwargs):
await self._call_event_handler("on_utterance_end", *args, **kwargs)
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
async def _on_message(self, *args, **kwargs):
result: LiveResultResponse = kwargs["result"]
if len(result.channel.alternatives) == 0:
return
is_final = result.is_final
transcript = result.channel.alternatives[0].transcript
language = None
if result.channel.alternatives[0].languages:
language = result.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
await self.stop_ttfb_metrics()
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
"",
time_now_iso8601(),
language,
result=result,
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(
InterimTranscriptionFrame(
transcript,
"",
time_now_iso8601(),
language,
result=result,
)
)
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Deepgram-specific handling.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame) and not self.vad_enabled:
# Start metrics if Deepgram VAD is disabled & pipeline VAD has detected speech
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# https://developers.deepgram.com/docs/finalize
await self._connection.finalize()
logger.trace(f"Triggered finalize event on: {frame.name=}, {direction=}")