#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from collections import deque
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
StartFrame,
UserStartedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
[docs]
class TurnTrackingObserver(BaseObserver):
"""Observer that tracks conversation turns in a pipeline.
Turn tracking logic:
- The first turn starts immediately when the pipeline starts (StartFrame)
- Subsequent turns start when the user starts speaking
- A turn ends when the bot stops speaking and either:
- The user starts speaking again
- A timeout period elapses with no more bot speech
"""
def __init__(self, max_frames=100, turn_end_timeout_secs=2.5, **kwargs):
super().__init__(**kwargs)
self._turn_count = 0
self._is_turn_active = False
self._is_bot_speaking = False
self._has_bot_spoken = False
self._turn_start_time = 0
self._turn_end_timeout_secs = turn_end_timeout_secs
self._end_turn_timer = None
# Track processed frames to avoid duplicates
self._processed_frames = set()
self._frame_history = deque(maxlen=max_frames)
self._register_event_handler("on_turn_started")
self._register_event_handler("on_turn_ended")
[docs]
async def on_push_frame(self, data: FramePushed):
"""Process frame events for turn tracking."""
# Skip already processed frames
if data.frame.id in self._processed_frames:
return
self._processed_frames.add(data.frame.id)
self._frame_history.append(data.frame.id)
# If we've exceeded our history size, remove the oldest frame ID
# from the set of processed frames.
if len(self._processed_frames) > len(self._frame_history):
# Rebuild the set from the current deque contents
self._processed_frames = set(self._frame_history)
if isinstance(data.frame, StartFrame):
# Start the first turn immediately when the pipeline starts
if self._turn_count == 0:
await self._start_turn(data)
elif isinstance(data.frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(data)
elif isinstance(data.frame, BotStartedSpeakingFrame):
await self._handle_bot_started_speaking(data)
# A BotStoppedSpeakingFrame can arrive after a UserStartedSpeakingFrame following an interruption
# We only want to end the turn if the bot was previously speaking
elif isinstance(data.frame, BotStoppedSpeakingFrame) and self._is_bot_speaking:
await self._handle_bot_stopped_speaking(data)
elif isinstance(data.frame, (EndFrame, CancelFrame)):
await self._handle_pipeline_end(data)
def _schedule_turn_end(self, data: FramePushed):
"""Schedule turn end with a timeout."""
# Cancel any existing timer
self._cancel_turn_end_timer()
# Create a new timer
loop = asyncio.get_event_loop()
self._end_turn_timer = loop.call_later(
self._turn_end_timeout_secs,
lambda: asyncio.create_task(self._end_turn_after_timeout(data)),
)
def _cancel_turn_end_timer(self):
"""Cancel the turn end timer if it exists."""
if self._end_turn_timer:
self._end_turn_timer.cancel()
self._end_turn_timer = None
async def _end_turn_after_timeout(self, data: FramePushed):
"""End turn after timeout has expired."""
if self._is_turn_active and not self._is_bot_speaking:
logger.trace(f"Turn {self._turn_count} ending due to timeout")
await self._end_turn(data, was_interrupted=False)
self._end_turn_timer = None
async def _handle_user_started_speaking(self, data: FramePushed):
"""Handle user speaking events, including interruptions."""
if self._is_bot_speaking:
# Handle interruption - end current turn and start a new one
self._cancel_turn_end_timer() # Cancel any pending end turn timer
await self._end_turn(data, was_interrupted=True)
self._is_bot_speaking = False # Bot is considered interrupted
await self._start_turn(data)
elif self._is_turn_active and self._has_bot_spoken:
# User started speaking during the turn_end_timeout_secs period after bot speech
self._cancel_turn_end_timer() # Cancel any pending end turn timer
await self._end_turn(data, was_interrupted=False)
await self._start_turn(data)
elif not self._is_turn_active:
# Start a new turn after previous one ended
await self._start_turn(data)
else:
# User is speaking within the same turn (before bot has responded)
logger.trace(f"User is already speaking in Turn {self._turn_count}")
async def _handle_bot_started_speaking(self, data: FramePushed):
"""Handle bot speaking events."""
self._is_bot_speaking = True
self._has_bot_spoken = True
# Cancel any pending turn end timer when bot starts speaking again
self._cancel_turn_end_timer()
async def _handle_bot_stopped_speaking(self, data: FramePushed):
"""Handle bot stopped speaking events."""
self._is_bot_speaking = False
# Schedule turn end with timeout
# This is needed to handle cases where the bot's speech ends and then resumes
# This can happen with HTTP TTS services or function calls
self._schedule_turn_end(data)
async def _handle_pipeline_end(self, data: FramePushed):
"""Handle pipeline end or cancellation by flushing any active turn."""
if self._is_turn_active:
# Cancel any pending turn end timer
self._cancel_turn_end_timer()
# End the current turn
await self._end_turn(data, was_interrupted=True)
async def _start_turn(self, data: FramePushed):
"""Start a new turn."""
self._is_turn_active = True
self._has_bot_spoken = False
self._turn_count += 1
self._turn_start_time = data.timestamp
logger.trace(f"Turn {self._turn_count} started")
await self._call_event_handler("on_turn_started", self._turn_count)
async def _end_turn(self, data: FramePushed, was_interrupted: bool):
"""End the current turn."""
if not self._is_turn_active:
return
duration = (data.timestamp - self._turn_start_time) / 1_000_000_000 # Convert to seconds
self._is_turn_active = False
status = "interrupted" if was_interrupted else "completed"
logger.trace(f"Turn {self._turn_count} {status} after {duration:.2f}s")
await self._call_event_handler("on_turn_ended", self._turn_count, duration, was_interrupted)