#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import audioop
import numpy as np
import pyloudnorm as pyln
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
[docs]
def create_default_resampler(**kwargs) -> BaseAudioResampler:
return SOXRAudioResampler(**kwargs)
[docs]
def mix_audio(audio1: bytes, audio2: bytes) -> bytes:
data1 = np.frombuffer(audio1, dtype=np.int16)
data2 = np.frombuffer(audio2, dtype=np.int16)
# Max length
max_length = max(len(data1), len(data2))
# Zero-pad the arrays to the same length
padded1 = np.pad(data1, (0, max_length - len(data1)), mode="constant")
padded2 = np.pad(data2, (0, max_length - len(data2)), mode="constant")
# Mix the arrays
mixed_audio = padded1.astype(np.int32) + padded2.astype(np.int32)
mixed_audio = np.clip(mixed_audio, -32768, 32767).astype(np.int16)
return mixed_audio.astype(np.int16).tobytes()
[docs]
def interleave_stereo_audio(left_audio: bytes, right_audio: bytes) -> bytes:
left = np.frombuffer(left_audio, dtype=np.int16)
right = np.frombuffer(right_audio, dtype=np.int16)
min_length = min(len(left), len(right))
left = left[:min_length]
right = right[:min_length]
stereo = np.column_stack((left, right))
return stereo.astype(np.int16).tobytes()
[docs]
def normalize_value(value, min_value, max_value):
normalized = (value - min_value) / (max_value - min_value)
normalized_clamped = max(0, min(1, normalized))
return normalized_clamped
[docs]
def calculate_audio_volume(audio: bytes, sample_rate: int) -> float:
audio_np = np.frombuffer(audio, dtype=np.int16)
audio_float = audio_np.astype(np.float64)
block_size = audio_np.size / sample_rate
meter = pyln.Meter(sample_rate, block_size=block_size)
loudness = meter.integrated_loudness(audio_float)
# Loudness goes from -20 to 80 (more or less), where -20 is quiet and 80 is
# loud.
loudness = normalize_value(loudness, -20, 80)
return loudness
[docs]
def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
return prev_value + factor * (value - prev_value)
[docs]
async def ulaw_to_pcm(
ulaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
):
# Convert μ-law to PCM
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
# Resample
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
[docs]
async def pcm_to_ulaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
out_ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
return out_ulaw_bytes
[docs]
async def alaw_to_pcm(
alaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
) -> bytes:
# Convert a-law to PCM
in_pcm_bytes = audioop.alaw2lin(alaw_bytes, 2)
# Resample
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
[docs]
async def pcm_to_alaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
out_alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
return out_alaw_bytes