#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import base64
import json
import uuid
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import (
AudioContextWordTTSService,
WordTTSService,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
raise Exception(f"Missing module: {e}")
ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]
# Models that support language codes
# The following models are excluded as they don't support language codes:
# - eleven_flash_v2
# - eleven_turbo_v2
# - eleven_multilingual_v2
ELEVENLABS_MULTILINGUAL_MODELS = {
"eleven_flash_v2_5",
"eleven_turbo_v2_5",
}
[docs]
def language_to_elevenlabs_language(language: Language) -> Optional[str]:
BASE_LANGUAGES = {
Language.AR: "ar",
Language.BG: "bg",
Language.CS: "cs",
Language.DA: "da",
Language.DE: "de",
Language.EL: "el",
Language.EN: "en",
Language.ES: "es",
Language.FI: "fi",
Language.FIL: "fil",
Language.FR: "fr",
Language.HI: "hi",
Language.HR: "hr",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.MS: "ms",
Language.NL: "nl",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SV: "sv",
Language.TA: "ta",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
Language.ZH: "zh",
}
result = BASE_LANGUAGES.get(language)
# If not found in base languages, try to find the base language from a variant
if not result:
# Convert enum value to string and get the base language part (e.g. es-ES -> es)
lang_str = str(language.value)
base_code = lang_str.split("-")[0].lower()
# Look up the base code in our supported languages
result = base_code if base_code in BASE_LANGUAGES.values() else None
return result
[docs]
def build_elevenlabs_voice_settings(
settings: Dict[str, Any],
) -> Optional[Dict[str, Union[float, bool]]]:
"""Build voice settings dictionary for ElevenLabs based on provided settings.
Args:
settings: Dictionary containing voice settings parameters
Returns:
Dictionary of voice settings or None if no valid settings are provided
"""
voice_setting_keys = ["stability", "similarity_boost", "style", "use_speaker_boost", "speed"]
voice_settings = {}
for key in voice_setting_keys:
if key in settings and settings[key] is not None:
voice_settings[key] = settings[key]
return voice_settings or None
[docs]
def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))
words = "".join(alignment_info["chars"]).split(" ")
# Calculate start time for each word. We do this by finding a space character
# and using the previous word time, also taking into account there might not
# be a space at the end.
times = []
for i, (a, b) in enumerate(zipped_times):
if a == " " or i == len(zipped_times) - 1:
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
times.append(t)
word_times = list(zip(words, times))
return word_times
[docs]
class ElevenLabsTTSService(AudioContextWordTTSService):
def __init__(
self,
*,
api_key: str,
voice_id: str,
model: str = "eleven_flash_v2_5",
url: str = "wss://api.elevenlabs.io",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
**kwargs,
):
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
#
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
params = params or ElevenLabsTTSService.InputParams()
self._api_key = api_key
self._url = url
self._settings = {
"language": self.language_to_service_language(params.language)
if params.language
else None,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
"speed": params.speed,
"auto_mode": str(params.auto_mode).lower(),
"enable_ssml_parsing": params.enable_ssml_parsing,
"enable_logging": params.enable_logging,
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
self._cumulative_time = 0
# Context management for v1 multi API
self._context_id = None
self._receive_task = None
self._keepalive_task = None
[docs]
def can_generate_metrics(self) -> bool:
return True
[docs]
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
[docs]
async def set_model(self, model: str):
await super().set_model(model)
logger.info(f"Switching TTS model to: [{model}]")
await self._disconnect()
await self._connect()
async def _update_settings(self, settings: Mapping[str, Any]):
prev_voice = self._voice_id
await super()._update_settings(settings)
if not prev_voice == self._voice_id:
logger.info(f"Switching TTS voice to: [{self._voice_id}]")
await self._disconnect()
await self._connect()
[docs]
async def start(self, frame: StartFrame):
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
[docs]
async def flush_audio(self):
if not self._context_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = {"context_id": self._context_id, "flush": True}
await self._websocket.send(json.dumps(msg))
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])
async def _connect(self):
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to ElevenLabs")
voice_id = self._voice_id
model = self.model_name
output_format = self._output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/multi-stream-input?model_id={model}&output_format={output_format}&auto_mode={self._settings['auto_mode']}"
if self._settings["enable_ssml_parsing"]:
url += f"&enable_ssml_parsing={self._settings['enable_ssml_parsing']}"
if self._settings["enable_logging"]:
url += f"&enable_logging={self._settings['enable_logging']}"
# Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS
language = self._settings["language"]
if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None:
url += f"&language_code={language}"
logger.debug(f"Using language code: {language}")
elif language is not None:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
# Set max websocket message size to 16MB for large audio responses
self._websocket = await websockets.connect(
url, max_size=16 * 1024 * 1024, extra_headers={"xi-api-key": self._api_key}
)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from ElevenLabs")
# Close all contexts and the socket
if self._context_id:
await self._websocket.send(json.dumps({"close_socket": True}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._context_id = None
self._websocket = None
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
# Close the current context when interrupted without closing the websocket
if self._context_id and self._websocket:
logger.trace(f"Closing context {self._context_id} due to interruption")
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since a StartInterruptionFrame is pushed
# every time the user speaks, we'll use this as a trigger to close the context
# and reset the state.
# Note: We do not need to call remove_audio_context here, as the context is
# automatically reset when super ()._handle_interruption is called.
await self._websocket.send(
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
self._started = False
async def _receive_messages(self):
async for message in self._get_websocket():
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("isFinal") is True:
logger.trace(f"Received final message for context {received_ctx_id}")
continue
# Check if this message belongs to the current context.
# This should never happen, so warn about it.
if not self.audio_context_available(received_ctx_id):
logger.warning(f"Ignoring message from unavailable context: {received_ctx_id}")
continue
if msg.get("audio"):
await self.stop_ttfb_metrics()
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
await self.append_to_audio_context(received_ctx_id, frame)
if msg.get("alignment"):
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
await self.add_word_timestamps(word_times)
self._cumulative_time = word_times[-1][1]
async def _keepalive_task_handler(self):
while True:
await asyncio.sleep(10)
try:
if self._websocket and self._websocket.open:
if self._context_id:
# Send keepalive with context ID to keep the connection alive
keepalive_message = {
"text": "",
"context_id": self._context_id,
}
logger.trace(f"Sending keepalive for context {self._context_id}")
else:
# It's possible to have a user interruption which clears the context
# without generating a new TTS response. In this case, we'll just send
# an empty message to keep the connection alive.
keepalive_message = {"text": ""}
logger.trace("Sending keepalive without context")
await self._websocket.send(json.dumps(keepalive_message))
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
async def _send_text(self, text: str):
if self._websocket and self._context_id:
msg = {"text": text, "context_id": self._context_id}
await self._websocket.send(json.dumps(msg))
[docs]
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
await self._connect()
try:
if not self._started:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
self._started = True
self._cumulative_time = 0
# Create new context ID and register it
self._context_id = str(uuid.uuid4())
await self.create_audio_context(self._context_id)
# Initialize context with voice settings
msg = {"text": " ", "context_id": self._context_id}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {self._context_id} with voice settings")
await self._send_text(text)
await self.start_tts_usage_metrics(text)
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
yield TTSStoppedFrame()
self._started = False
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
[docs]
class ElevenLabsHttpTTSService(WordTTSService):
"""ElevenLabs Text-to-Speech service using HTTP streaming with word timestamps.
Args:
api_key: ElevenLabs API key
voice_id: ID of the voice to use
aiohttp_session: aiohttp ClientSession
model: Model ID (default: "eleven_flash_v2_5" for low latency)
base_url: API base URL
sample_rate: Output sample rate
params: Additional parameters for voice configuration
"""
def __init__(
self,
*,
api_key: str,
voice_id: str,
aiohttp_session: aiohttp.ClientSession,
model: str = "eleven_flash_v2_5",
base_url: str = "https://api.elevenlabs.io",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
**kwargs,
):
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
sample_rate=sample_rate,
**kwargs,
)
params = params or ElevenLabsHttpTTSService.InputParams()
self._api_key = api_key
self._base_url = base_url
self._params = params
self._session = aiohttp_session
self._settings = {
"language": self.language_to_service_language(params.language)
if params.language
else None,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
"speed": params.speed,
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
# Track cumulative time to properly sequence word timestamps across utterances
self._cumulative_time = 0
self._started = False
# Store previous text for context within a turn
self._previous_text = ""
[docs]
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert pipecat Language to ElevenLabs language code."""
return language_to_elevenlabs_language(language)
[docs]
def can_generate_metrics(self) -> bool:
"""Indicate that this service can generate usage metrics."""
return True
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
def _reset_state(self):
"""Reset internal state variables."""
self._cumulative_time = 0
self._started = False
self._previous_text = ""
logger.debug(f"{self}: Reset internal state")
[docs]
async def start(self, frame: StartFrame):
"""Initialize the service upon receiving a StartFrame."""
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
self._reset_state()
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if isinstance(frame, (StartInterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])
elif isinstance(frame, LLMFullResponseEndFrame):
# End of turn - reset previous text
self._previous_text = ""
[docs]
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]:
"""Calculate word timing from character alignment data.
Example input data:
{
"characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"],
"character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
"character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
}
Would produce word times (with cumulative_time=0):
[("Hello", 0.1), ("world", 0.5)]
Args:
alignment_info: Character timing data from ElevenLabs
Returns:
List of (word, timestamp) pairs
"""
chars = alignment_info.get("characters", [])
char_start_times = alignment_info.get("character_start_times_seconds", [])
if not chars or not char_start_times or len(chars) != len(char_start_times):
logger.warning(
f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}"
)
return []
# Build the words and find their start times
words = []
word_start_times = []
current_word = ""
first_char_idx = -1
for i, char in enumerate(chars):
if char == " ":
if current_word: # Only add non-empty words
words.append(current_word)
# Use time of the first character of the word, offset by cumulative time
word_start_times.append(
self._cumulative_time + char_start_times[first_char_idx]
)
current_word = ""
first_char_idx = -1
else:
if not current_word: # This is the first character of a new word
first_char_idx = i
current_word += char
# Don't forget the last word if there's no trailing space
if current_word and first_char_idx >= 0:
words.append(current_word)
word_start_times.append(self._cumulative_time + char_start_times[first_char_idx])
# Create word-time pairs
word_times = list(zip(words, word_start_times))
return word_times
[docs]
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using ElevenLabs streaming API with timestamps.
Makes a request to the ElevenLabs API to generate audio and timing data.
Tracks the duration of each utterance to ensure correct sequencing.
Includes previous text as context for better prosody continuity.
Args:
text: Text to convert to speech
Yields:
Audio and control frames
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Use the with-timestamps endpoint
url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream/with-timestamps"
payload: Dict[str, Union[str, Dict[str, Union[float, bool]]]] = {
"text": text,
"model_id": self._model_name,
}
# Include previous text as context if available
if self._previous_text:
payload["previous_text"] = self._previous_text
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
language = self._settings["language"]
if self._model_name in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
logger.debug(f"Using language code: {language}")
elif language:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
# Build query parameters
params = {
"output_format": self._output_format,
}
if self._settings["optimize_streaming_latency"] is not None:
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
try:
await self.start_ttfb_metrics()
async with self._session.post(
url, json=payload, headers=headers, params=params
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{self} error: {error_text}")
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
await self.start_tts_usage_metrics(text)
# Start TTS sequence if not already started
if not self._started:
self.start_word_timestamps()
yield TTSStartedFrame()
self._started = True
# Track the duration of this utterance based on the last character's end time
utterance_duration = 0
async for line in response.content:
line_str = line.decode("utf-8").strip()
if not line_str:
continue
try:
# Parse the JSON object
data = json.loads(line_str)
# Process audio if present
if data and "audio_base64" in data:
await self.stop_ttfb_metrics()
audio = base64.b64decode(data["audio_base64"])
yield TTSAudioRawFrame(audio, self.sample_rate, 1)
# Process alignment if present
if data and "alignment" in data:
alignment = data["alignment"]
if alignment: # Ensure alignment is not None
# Get end time of the last character in this chunk
char_end_times = alignment.get("character_end_times_seconds", [])
if char_end_times:
chunk_end_time = char_end_times[-1]
# Update to the longest end time seen so far
utterance_duration = max(utterance_duration, chunk_end_time)
# Calculate word timestamps
word_times = self.calculate_word_times(alignment)
if word_times:
await self.add_word_timestamps(word_times)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
logger.error(f"Error processing response: {e}", exc_info=True)
continue
# After processing all chunks, add the total utterance duration
# to the cumulative time to ensure next utterance starts after this one
if utterance_duration > 0:
self._cumulative_time += utterance_duration
# Append the current text to previous_text for context continuity
# Only add a space if there's already text
if self._previous_text:
self._previous_text += " " + text
else:
self._previous_text = text
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
finally:
await self.stop_ttfb_metrics()
# Let the parent class handle TTSStoppedFrame