Source code for pipecat.services.neuphonic.tts

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import base64
import json
from typing import Any, AsyncGenerator, Mapping, Optional

from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    BotStoppedSpeakingFrame,
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    LLMFullResponseEndFrame,
    StartFrame,
    StartInterruptionFrame,
    TTSAudioRawFrame,
    TTSSpeakFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts

try:
    import websockets
    from pyneuphonic import Neuphonic, TTSConfig
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`.")
    raise Exception(f"Missing module: {e}")


[docs] def language_to_neuphonic_lang_code(language: Language) -> Optional[str]: BASE_LANGUAGES = { Language.DE: "de", Language.EN: "en", Language.ES: "es", Language.NL: "nl", Language.AR: "ar", Language.FR: "fr", Language.PT: "pt", Language.RU: "ru", Language.HI: "HI", Language.ZH: "zh", } result = BASE_LANGUAGES.get(language) # If not found in base languages, try to find the base language from a variant if not result: # Convert enum value to string and get the base language part (e.g. es-ES -> es) lang_str = str(language.value) base_code = lang_str.split("-")[0].lower() # Look up the base code in our supported languages result = base_code if base_code in BASE_LANGUAGES.values() else None return result
[docs] class NeuphonicTTSService(InterruptibleTTSService):
[docs] class InputParams(BaseModel): language: Optional[Language] = Language.EN speed: Optional[float] = 1.0
def __init__( self, *, api_key: str, voice_id: Optional[str] = None, url: str = "wss://api.neuphonic.com", sample_rate: Optional[int] = 22050, encoding: str = "pcm_linear", params: Optional[InputParams] = None, **kwargs, ): super().__init__( aggregate_sentences=True, push_text_frames=False, push_stop_frames=True, stop_frame_timeout_s=2.0, sample_rate=sample_rate, **kwargs, ) params = params or NeuphonicTTSService.InputParams() self._api_key = api_key self._url = url self._settings = { "lang_code": self.language_to_service_language(params.language), "speed": params.speed, "encoding": encoding, "sampling_rate": sample_rate, } self.set_voice(voice_id) # Indicates if we have sent TTSStartedFrame. It will reset to False when # there's an interruption or TTSStoppedFrame. self._started = False self._cumulative_time = 0 self._receive_task = None self._keepalive_task = None
[docs] def can_generate_metrics(self) -> bool: return True
[docs] def language_to_service_language(self, language: Language) -> Optional[str]: return language_to_neuphonic_lang_code(language)
async def _update_settings(self, settings: Mapping[str, Any]): if "voice_id" in settings: self.set_voice(settings["voice_id"]) await super()._update_settings(settings) await self._disconnect() await self._connect() logger.info(f"Switching TTS to settings: [{self._settings}]")
[docs] async def start(self, frame: StartFrame): await super().start(frame) await self._connect()
[docs] async def stop(self, frame: EndFrame): await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._disconnect()
[docs] async def flush_audio(self): if self._websocket: msg = {"text": "<STOP>"} await self._websocket.send(json.dumps(msg))
[docs] async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): await super().push_frame(frame, direction) if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)): self._started = False
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # If we received a TTSSpeakFrame and the LLM response included text (it # might be that it's only a function calling response) we pause # processing more frames until we receive a BotStoppedSpeakingFrame. if isinstance(frame, TTSSpeakFrame): await self.pause_processing_frames() elif isinstance(frame, LLMFullResponseEndFrame) and self._started: await self.pause_processing_frames() elif isinstance(frame, BotStoppedSpeakingFrame): await self.resume_processing_frames()
async def _connect(self): await self._connect_websocket() if self._websocket and not self._receive_task: self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) if self._websocket and not self._keepalive_task: self._keepalive_task = self.create_task(self._keepalive_task_handler()) async def _disconnect(self): if self._receive_task: await self.cancel_task(self._receive_task) self._receive_task = None if self._keepalive_task: await self.cancel_task(self._keepalive_task) self._keepalive_task = None await self._disconnect_websocket() async def _connect_websocket(self): try: if self._websocket and self._websocket.open: return logger.debug("Connecting to Neuphonic") tts_config = { **self._settings, "voice_id": self._voice_id, } query_params = [f"api_key={self._api_key}"] for key, value in tts_config.items(): if value is not None: query_params.append(f"{key}={value}") url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}" self._websocket = await websockets.connect(url) except Exception as e: logger.error(f"{self} initialization error: {e}") self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") async def _disconnect_websocket(self): try: await self.stop_all_metrics() if self._websocket: logger.debug("Disconnecting from Neuphonic") await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") finally: self._started = False self._websocket = None async def _receive_messages(self): async for message in self._websocket: if isinstance(message, str): msg = json.loads(message) if msg.get("data", {}).get("audio") is not None: await self.stop_ttfb_metrics() audio = base64.b64decode(msg["data"]["audio"]) frame = TTSAudioRawFrame(audio, self.sample_rate, 1) await self.push_frame(frame) async def _keepalive_task_handler(self): while True: await asyncio.sleep(10) await self._send_text("") async def _send_text(self, text: str): if self._websocket: msg = {"text": text} logger.debug(f"Sending text to websocket: {msg}") await self._websocket.send(json.dumps(msg))
[docs] @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") try: if not self._websocket or self._websocket.closed: await self._connect() try: if not self._started: await self.start_ttfb_metrics() yield TTSStartedFrame() self._started = True self._cumulative_time = 0 await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: logger.error(f"{self} error sending message: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() return yield None except Exception as e: logger.error(f"{self} exception: {e}")
[docs] class NeuphonicHttpTTSService(TTSService): """Neuphonic Text-to-Speech service using HTTP streaming. Args: api_key: Neuphonic API key voice_id: ID of the voice to use url: Base URL for the Neuphonic API (default: "https://api.neuphonic.com") sample_rate: Sample rate for audio output (default: 22050Hz) encoding: Audio encoding format (default: "pcm_linear") params: Additional parameters for TTS generation including language and speed **kwargs: Additional keyword arguments passed to the parent class """
[docs] class InputParams(BaseModel): language: Optional[Language] = Language.EN speed: Optional[float] = 1.0
def __init__( self, *, api_key: str, voice_id: Optional[str] = None, url: str = "https://api.neuphonic.com", sample_rate: Optional[int] = 22050, encoding: str = "pcm_linear", params: Optional[InputParams] = None, **kwargs, ): super().__init__(sample_rate=sample_rate, **kwargs) params = params or NeuphonicHttpTTSService.InputParams() self._api_key = api_key self._url = url self._settings = { "lang_code": self.language_to_service_language(params.language), "speed": params.speed, "encoding": encoding, "sampling_rate": sample_rate, } self.set_voice(voice_id)
[docs] def can_generate_metrics(self) -> bool: return True
[docs] async def start(self, frame: StartFrame): await super().start(frame)
[docs] async def flush_audio(self): pass
[docs] @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: """Generate speech from text using Neuphonic streaming API. Args: text: The text to convert to speech Yields: Frames containing audio data and status information """ logger.debug(f"Generating TTS: [{text}]") client = Neuphonic(api_key=self._api_key, base_url=self._url.replace("https://", "")) sse = client.tts.AsyncSSEClient() try: await self.start_ttfb_metrics() response = sse.send(text, TTSConfig(**self._settings, voice_id=self._voice_id)) await self.start_tts_usage_metrics(text) yield TTSStartedFrame() async for message in response: if message.status_code != 200: logger.error(f"{self} error: {message.errors}") yield ErrorFrame(error=f"Neuphonic API error: {message.errors}") await self.stop_ttfb_metrics() yield TTSAudioRawFrame(message.data.audio, self.sample_rate, 1) except Exception as e: logger.error(f"Error in run_tts: {e}") yield ErrorFrame(error=str(e)) finally: yield TTSStoppedFrame()