#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Cartesia text-to-speech service implementations."""
import base64
import json
import uuid
import warnings
from typing import AsyncGenerator, List, Optional, Union
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import AudioContextWordTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for Cartesia configuration needed
try:
import websockets
from cartesia import AsyncCartesia
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
[docs]
def language_to_cartesia_language(language: Language) -> Optional[str]:
"""Convert a Language enum to Cartesia language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding Cartesia language code, or None if not supported.
"""
BASE_LANGUAGES = {
Language.DE: "de",
Language.EN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.HI: "hi",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.NL: "nl",
Language.PL: "pl",
Language.PT: "pt",
Language.RU: "ru",
Language.SV: "sv",
Language.TR: "tr",
Language.ZH: "zh",
}
result = BASE_LANGUAGES.get(language)
# If not found in base languages, try to find the base language from a variant
if not result:
# Convert enum value to string and get the base language part (e.g. es-ES -> es)
lang_str = str(language.value)
base_code = lang_str.split("-")[0].lower()
# Look up the base code in our supported languages
result = base_code if base_code in BASE_LANGUAGES.values() else None
return result
[docs]
class CartesiaTTSService(AudioContextWordTTSService):
"""Cartesia TTS service with WebSocket streaming and word timestamps.
Provides text-to-speech using Cartesia's streaming WebSocket API.
Supports word-level timestamps, audio context management, and various voice
customization options including speed and emotion controls.
Args:
api_key: Cartesia API key for authentication.
voice_id: ID of the voice to use for synthesis.
cartesia_version: API version string for Cartesia service.
url: WebSocket URL for Cartesia TTS API.
model: TTS model to use (e.g., "sonic-2").
sample_rate: Audio sample rate. If None, uses default.
encoding: Audio encoding format.
container: Audio container format.
params: Additional input parameters for voice customization.
text_aggregator: Custom text aggregator for processing input text.
**kwargs: Additional arguments passed to the parent service.
"""
def __init__(
self,
*,
api_key: str,
voice_id: str,
cartesia_version: str = "2025-04-16",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic-2",
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: Optional[InputParams] = None,
text_aggregator: Optional[BaseTextAggregator] = None,
**kwargs,
):
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
text_aggregator=text_aggregator or SkipTagsAggregator([("<spell>", "</spell>")]),
**kwargs,
)
params = params or CartesiaTTSService.InputParams()
self._api_key = api_key
self._cartesia_version = cartesia_version
self._url = url
self._settings = {
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
else "en",
"speed": params.speed,
"emotion": params.emotion,
}
self.set_model_name(model)
self.set_voice(voice_id)
self._context_id = None
self._receive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Cartesia service supports metrics generation.
"""
return True
[docs]
async def set_model(self, model: str):
"""Set the TTS model.
Args:
model: The model name to use for synthesis.
"""
self._model_id = model
await super().set_model(model)
logger.info(f"Switching TTS model to: [{model}]")
[docs]
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Cartesia language format.
Args:
language: The language to convert.
Returns:
The Cartesia-specific language code, or None if not supported.
"""
return language_to_cartesia_language(language)
def _build_msg(
self, text: str = "", continue_transcript: bool = True, add_timestamps: bool = True
):
voice_config = {}
voice_config["mode"] = "id"
voice_config["id"] = self._voice_id
if self._settings["emotion"]:
warnings.warn(
"The 'emotion' parameter in __experimental_controls is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
voice_config["__experimental_controls"] = {}
if self._settings["emotion"]:
voice_config["__experimental_controls"]["emotion"] = self._settings["emotion"]
msg = {
"transcript": text,
"continue": continue_transcript,
"context_id": self._context_id,
"model_id": self.model_name,
"voice": voice_config,
"output_format": self._settings["output_format"],
"language": self._settings["language"],
"add_timestamps": add_timestamps,
"use_original_timestamps": False if self.model_name == "sonic" else True,
}
if self._settings["speed"]:
msg["speed"] = self._settings["speed"]
return json.dumps(msg)
[docs]
async def start(self, frame: StartFrame):
"""Start the Cartesia TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Cartesia TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Stop the Cartesia TTS service.
Args:
frame: The end frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Cartesia")
self._websocket = await websockets.connect(
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._context_id = None
self._websocket = None
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
if self._context_id:
cancel_msg = json.dumps({"context_id": self._context_id, "cancel": True})
await self._get_websocket().send(cancel_msg)
self._context_id = None
[docs]
async def flush_audio(self):
"""Flush any pending audio and finalize the current context."""
if not self._context_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = self._build_msg(text="", continue_transcript=False)
await self._websocket.send(msg)
self._context_id = None
async def _receive_messages(self):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg or not self.audio_context_available(msg["context_id"]):
continue
if msg["type"] == "done":
await self.stop_ttfb_metrics()
await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)])
await self.remove_audio_context(msg["context_id"])
elif msg["type"] == "timestamps":
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
)
elif msg["type"] == "chunk":
await self.stop_ttfb_metrics()
self.start_word_timestamps()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self.sample_rate,
num_channels=1,
)
await self.append_to_audio_context(msg["context_id"], frame)
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
self._context_id = None
else:
logger.error(f"{self} error, unknown message type: {msg}")
[docs]
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Cartesia's streaming API.
Args:
text: The text to synthesize into speech.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
await self._connect()
if not self._context_id:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
self._context_id = str(uuid.uuid4())
await self.create_audio_context(self._context_id)
msg = self._build_msg(text=text)
try:
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
[docs]
class CartesiaHttpTTSService(TTSService):
"""Cartesia HTTP-based TTS service.
Provides text-to-speech using Cartesia's HTTP API for simpler, non-streaming
synthesis. Suitable for use cases where streaming is not required and simpler
integration is preferred.
Args:
api_key: Cartesia API key for authentication.
voice_id: ID of the voice to use for synthesis.
model: TTS model to use (e.g., "sonic-2").
base_url: Base URL for Cartesia HTTP API.
cartesia_version: API version string for Cartesia service.
sample_rate: Audio sample rate. If None, uses default.
encoding: Audio encoding format.
container: Audio container format.
params: Additional input parameters for voice customization.
**kwargs: Additional arguments passed to the parent TTSService.
"""
def __init__(
self,
*,
api_key: str,
voice_id: str,
model: str = "sonic-2",
base_url: str = "https://api.cartesia.ai",
cartesia_version: str = "2024-11-13",
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: Optional[InputParams] = None,
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
params = params or CartesiaHttpTTSService.InputParams()
self._api_key = api_key
self._base_url = base_url
self._cartesia_version = cartesia_version
self._settings = {
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
else "en",
"speed": params.speed,
"emotion": params.emotion,
}
self.set_voice(voice_id)
self.set_model_name(model)
self._client = AsyncCartesia(
api_key=api_key,
base_url=base_url,
)
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Cartesia HTTP service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Cartesia language format.
Args:
language: The language to convert.
Returns:
The Cartesia-specific language code, or None if not supported.
"""
return language_to_cartesia_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the Cartesia HTTP TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Cartesia HTTP TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._client.close()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Cartesia HTTP TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._client.close()
[docs]
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Cartesia's HTTP API.
Args:
text: The text to synthesize into speech.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
voice_config = {"mode": "id", "id": self._voice_id}
if self._settings["emotion"]:
warnings.warn(
"The 'emotion' parameter in voice.__experimental_controls is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
voice_config["__experimental_controls"] = {"emotion": self._settings["emotion"]}
await self.start_ttfb_metrics()
payload = {
"model_id": self._model_name,
"transcript": text,
"voice": voice_config,
"output_format": self._settings["output_format"],
"language": self._settings["language"],
}
if self._settings["speed"]:
payload["speed"] = self._settings["speed"]
yield TTSStartedFrame()
session = await self._client._get_session()
headers = {
"Cartesia-Version": self._cartesia_version,
"X-API-Key": self._api_key,
"Content-Type": "application/json",
}
url = f"{self._base_url}/tts/bytes"
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(f"Cartesia API error: {error_text}"))
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
await self.start_tts_usage_metrics(text)
frame = TTSAudioRawFrame(
audio=audio_data,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()