Source code for pipecat.serializers.livekit

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import ctypes
import pickle

from loguru import logger

from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType

try:
    from livekit.rtc import AudioFrame
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use LiveKit, you need to `pip install pipecat-ai[livekit]`.")
    raise Exception(f"Missing module: {e}")


[docs] class LivekitFrameSerializer(FrameSerializer): @property def type(self) -> FrameSerializerType: return FrameSerializerType.BINARY
[docs] async def serialize(self, frame: Frame) -> str | bytes | None: if not isinstance(frame, OutputAudioRawFrame): return None audio_frame = AudioFrame( data=frame.audio, sample_rate=frame.sample_rate, num_channels=frame.num_channels, samples_per_channel=len(frame.audio) // ctypes.sizeof(ctypes.c_int16), ) return pickle.dumps(audio_frame)
[docs] async def deserialize(self, data: str | bytes) -> Frame | None: audio_frame: AudioFrame = pickle.loads(data)["frame"] return InputAudioRawFrame( audio=bytes(audio_frame.data), sample_rate=audio_frame.sample_rate, num_channels=audio_frame.num_channels, )