Source code for pipecat.services.simli.video

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio

import numpy as np
from loguru import logger

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    OutputImageRawFrame,
    StartInterruptionFrame,
    TTSAudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame

try:
    from av.audio.frame import AudioFrame
    from av.audio.resampler import AudioResampler
    from simli import SimliClient, SimliConfig
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Simli, you need to `pip install pipecat-ai[simli]`.")
    raise Exception(f"Missing module: {e}")


[docs] class SimliVideoService(FrameProcessor): def __init__( self, simli_config: SimliConfig, use_turn_server: bool = False, latency_interval: int = 0, ): super().__init__() self._simli_client = SimliClient(simli_config, use_turn_server, latency_interval) self._pipecat_resampler_event = asyncio.Event() self._pipecat_resampler: AudioResampler = None self._simli_resampler = AudioResampler("s16", "mono", 16000) self._initialized = False self._audio_task: asyncio.Task = None self._video_task: asyncio.Task = None async def _start_connection(self): if not self._initialized: await self._simli_client.Initialize() self._initialized = True # Create task to consume and process audio and video if not self._audio_task: self._audio_task = self.create_task(self._consume_and_process_audio()) if not self._video_task: self._video_task = self.create_task(self._consume_and_process_video()) async def _consume_and_process_audio(self): await self._pipecat_resampler_event.wait() async for audio_frame in self._simli_client.getAudioStreamIterator(): self.start_watchdog() resampled_frames = self._pipecat_resampler.resample(audio_frame) for resampled_frame in resampled_frames: audio_array = resampled_frame.to_ndarray() # Only push frame is there is audio (e.g. not silence) if audio_array.any(): await self.push_frame( TTSAudioRawFrame( audio=audio_array.tobytes(), sample_rate=self._pipecat_resampler.rate, num_channels=1, ), ) self.reset_watchdog() async def _consume_and_process_video(self): await self._pipecat_resampler_event.wait() async for video_frame in self._simli_client.getVideoStreamIterator(targetFormat="rgb24"): self.start_watchdog() # Process the video frame convertedFrame: OutputImageRawFrame = OutputImageRawFrame( image=video_frame.to_rgb().to_image().tobytes(), size=(video_frame.width, video_frame.height), format="RGB", ) convertedFrame.pts = video_frame.pts await self.push_frame(convertedFrame) self.reset_watchdog()
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, StartFrame): await self.push_frame(frame, direction) await self._start_connection() elif isinstance(frame, TTSAudioRawFrame): # Send audio frame to Simli try: old_frame = AudioFrame.from_ndarray( np.frombuffer(frame.audio, dtype=np.int16)[None, :], layout="mono" if frame.num_channels == 1 else "stereo", ) old_frame.sample_rate = frame.sample_rate if self._pipecat_resampler is None: self._pipecat_resampler = AudioResampler( "s16", old_frame.layout, old_frame.sample_rate ) self._pipecat_resampler_event.set() resampled_frames = self._simli_resampler.resample(old_frame) for resampled_frame in resampled_frames: await self._simli_client.send( resampled_frame.to_ndarray().astype(np.int16).tobytes() ) except Exception as e: logger.exception(f"{self} exception: {e}") elif isinstance(frame, (EndFrame, CancelFrame)): await self._stop() await self.push_frame(frame, direction) elif isinstance(frame, StartInterruptionFrame): await self._simli_client.clearBuffer() await self.push_frame(frame, direction) else: await self.push_frame(frame, direction)
async def _stop(self): await self._simli_client.stop() if self._audio_task: await self.cancel_task(self._audio_task) self._audio_task = None if self._video_task: await self.cancel_task(self._video_task) self._video_task = None