Source code for pipecat.processors.user_idle_processor

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import inspect
from typing import Awaitable, Callable, Union

from pipecat.frames.frames import (
    BotSpeakingFrame,
    CancelFrame,
    EndFrame,
    Frame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


[docs] class UserIdleProcessor(FrameProcessor): """Monitors user inactivity and triggers callbacks after timeout periods. Starts monitoring only after the first conversation activity (UserStartedSpeaking or BotSpeaking). Args: callback: Function to call when user is idle. Can be either: - Basic callback(processor) -> None - Retry callback(processor, retry_count) -> bool Return True to continue monitoring for idle events, Return False to stop the idle monitoring task timeout: Seconds to wait before considering user idle **kwargs: Additional arguments passed to FrameProcessor Example: # Retry callback: async def handle_idle(processor: "UserIdleProcessor", retry_count: int) -> bool: if retry_count < 3: await send_reminder("Are you still there?") return True return False # Basic callback: async def handle_idle(processor: "UserIdleProcessor") -> None: await send_reminder("Are you still there?") processor = UserIdleProcessor( callback=handle_idle, timeout=5.0 ) """ def __init__( self, *, callback: Union[ Callable[["UserIdleProcessor"], Awaitable[None]], # Basic Callable[["UserIdleProcessor", int], Awaitable[bool]], # Retry ], timeout: float, **kwargs, ): super().__init__(**kwargs) self._callback = self._wrap_callback(callback) self._timeout = timeout self._retry_count = 0 self._interrupted = False self._conversation_started = False self._idle_task = None self._idle_event = asyncio.Event() def _wrap_callback( self, callback: Union[ Callable[["UserIdleProcessor"], Awaitable[None]], Callable[["UserIdleProcessor", int], Awaitable[bool]], ], ) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]: """Wraps callback to support both basic and retry signatures. Args: callback: The callback function to wrap. Returns: A wrapped callback that returns bool to indicate whether to continue monitoring. """ sig = inspect.signature(callback) param_count = len(sig.parameters) async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool: if param_count == 1: # Basic callback await callback(processor) # type: ignore return True else: # Retry callback return await callback(processor, retry_count) # type: ignore return wrapper def _create_idle_task(self) -> None: """Creates the idle task if it hasn't been created yet.""" if not self._idle_task: self._idle_task = self.create_task(self._idle_task_handler()) @property def retry_count(self) -> int: """Returns the current retry count.""" return self._retry_count async def _stop(self) -> None: """Stops and cleans up the idle monitoring task.""" if self._idle_task: await self.cancel_task(self._idle_task) self._idle_task = None
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: """Processes incoming frames and manages idle monitoring state. Args: frame: The frame to process direction: Direction of the frame flow """ await super().process_frame(frame, direction) # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): # Stop the idle task, if it exists await self._stop() # Push the frame down the pipeline await self.push_frame(frame, direction) return await self.push_frame(frame, direction) # Start monitoring on first conversation activity if not self._conversation_started and isinstance( frame, (UserStartedSpeakingFrame, BotSpeakingFrame) ): self._conversation_started = True self._create_idle_task() # Only process these events if conversation has started if self._conversation_started: # We shouldn't call the idle callback if the user or the bot are speaking if isinstance(frame, UserStartedSpeakingFrame): self._retry_count = 0 # Reset retry count when user speaks self._interrupted = True self._idle_event.set() elif isinstance(frame, UserStoppedSpeakingFrame): self._interrupted = False self._idle_event.set() elif isinstance(frame, BotSpeakingFrame): self._idle_event.set()
[docs] async def cleanup(self) -> None: """Cleans up resources when processor is shutting down.""" await super().cleanup() if self._idle_task: # Only stop if task exists await self._stop()
async def _idle_task_handler(self) -> None: """Monitors for idle timeout and triggers callbacks. Runs in a loop until cancelled or callback indicates completion. """ while True: try: await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) except asyncio.TimeoutError: if not self._interrupted: self._retry_count += 1 should_continue = await self._callback(self, self._retry_count) if not should_continue: await self._stop() break finally: self._idle_event.clear()