Source code for pipecat.processors.transcript_processor

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import List, Optional

from loguru import logger

from pipecat.frames.frames import (
    BotStoppedSpeakingFrame,
    CancelFrame,
    EndFrame,
    Frame,
    StartInterruptionFrame,
    TranscriptionFrame,
    TranscriptionMessage,
    TranscriptionUpdateFrame,
    TTSTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.time import time_now_iso8601


[docs] class BaseTranscriptProcessor(FrameProcessor): """Base class for processing conversation transcripts. Provides common functionality for handling transcript messages and updates. """ def __init__(self, **kwargs): """Initialize processor with empty message store.""" super().__init__(**kwargs) self._processed_messages: List[TranscriptionMessage] = [] self._register_event_handler("on_transcript_update") async def _emit_update(self, messages: List[TranscriptionMessage]): """Emit transcript updates for new messages. Args: messages: New messages to emit in update """ if messages: self._processed_messages.extend(messages) update_frame = TranscriptionUpdateFrame(messages=messages) await self._call_event_handler("on_transcript_update", update_frame) await self.push_frame(update_frame)
[docs] class UserTranscriptProcessor(BaseTranscriptProcessor): """Processes user transcription frames into timestamped conversation messages."""
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process TranscriptionFrames into user conversation messages. Args: frame: Input frame to process direction: Frame processing direction """ await super().process_frame(frame, direction) if isinstance(frame, TranscriptionFrame): message = TranscriptionMessage( role="user", user_id=frame.user_id, content=frame.text, timestamp=frame.timestamp ) await self._emit_update([message]) await self.push_frame(frame, direction)
[docs] class AssistantTranscriptProcessor(BaseTranscriptProcessor): """Processes assistant TTS text frames into timestamped conversation messages. This processor aggregates TTS text frames into complete utterances and emits them as transcript messages. Utterances are completed when: - The bot stops speaking (BotStoppedSpeakingFrame) - The bot is interrupted (StartInterruptionFrame) - The pipeline ends (EndFrame) Attributes: _current_text_parts: List of text fragments being aggregated for current utterance _aggregation_start_time: Timestamp when the current utterance began """ def __init__(self, **kwargs): """Initialize processor with aggregation state.""" super().__init__(**kwargs) self._current_text_parts: List[str] = [] self._aggregation_start_time: Optional[str] = None async def _emit_aggregated_text(self): """Aggregates and emits text fragments as a transcript message. This method uses a heuristic to automatically detect whether text fragments contain embedded spacing (spaces at the beginning or end of fragments) or not, and applies the appropriate joining strategy. It handles fragments from different TTS services with different formatting patterns. Examples: Fragments with embedded spacing (concatenated): ``` TTSTextFrame: ["Hello"] TTSTextFrame: [" there"] # Leading space TTSTextFrame: ["!"] TTSTextFrame: [" How"] # Leading space TTSTextFrame: ["'s"] TTSTextFrame: [" it"] # Leading space ``` Result: "Hello there! How's it" Fragments with trailing spaces (concatenated): ``` TTSTextFrame: ["Hel"] TTSTextFrame: ["lo "] # Trailing space TTSTextFrame: ["to "] # Trailing space TTSTextFrame: ["you"] ``` Result: "Hello to you" Word-by-word fragments without spacing (joined with spaces): ``` TTSTextFrame: ["Hello"] TTSTextFrame: ["there"] TTSTextFrame: ["how"] TTSTextFrame: ["are"] TTSTextFrame: ["you"] ``` Result: "Hello there how are you" """ if self._current_text_parts and self._aggregation_start_time: has_leading_spaces = any( part and part[0].isspace() for part in self._current_text_parts[1:] ) has_trailing_spaces = any( part and part[-1].isspace() for part in self._current_text_parts[:-1] ) # If there are embedded spaces in the fragments, use direct concatenation contains_spacing_between_fragments = has_leading_spaces or has_trailing_spaces # Apply corresponding joining method if contains_spacing_between_fragments: # Fragments already have spacing - just concatenate content = "".join(self._current_text_parts) else: # Word-by-word fragments - join with spaces content = " ".join(self._current_text_parts) # Clean up any excessive whitespace content = content.strip() if content: logger.trace(f"Emitting aggregated assistant message: {content}") message = TranscriptionMessage( role="assistant", content=content, timestamp=self._aggregation_start_time, ) await self._emit_update([message]) else: logger.trace("No content to emit after stripping whitespace") # Reset aggregation state self._current_text_parts = [] self._aggregation_start_time = None
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames into assistant conversation messages. Handles different frame types: - TTSTextFrame: Aggregates text for current utterance - BotStoppedSpeakingFrame: Completes current utterance - StartInterruptionFrame: Completes current utterance due to interruption - EndFrame: Completes current utterance at pipeline end - CancelFrame: Completes current utterance due to cancellation Args: frame: Input frame to process direction: Frame processing direction """ await super().process_frame(frame, direction) if isinstance(frame, (StartInterruptionFrame, CancelFrame)): # Push frame first otherwise our emitted transcription update frame # might get cleaned up. await self.push_frame(frame, direction) # Emit accumulated text with interruptions await self._emit_aggregated_text() elif isinstance(frame, TTSTextFrame): # Start timestamp on first text part if not self._aggregation_start_time: self._aggregation_start_time = time_now_iso8601() self._current_text_parts.append(frame.text) # Push frame. await self.push_frame(frame, direction) elif isinstance(frame, (BotStoppedSpeakingFrame, EndFrame)): # Emit accumulated text when bot finishes speaking or pipeline ends. await self._emit_aggregated_text() # Push frame. await self.push_frame(frame, direction) else: await self.push_frame(frame, direction)
[docs] class TranscriptProcessor: """Factory for creating and managing transcript processors. Provides unified access to user and assistant transcript processors with shared event handling. Example: ```python transcript = TranscriptProcessor() pipeline = Pipeline( [ transport.input(), stt, transcript.user(), # User transcripts context_aggregator.user(), llm, tts, transport.output(), transcript.assistant_tts(), # Assistant transcripts context_aggregator.assistant(), ] ) @transcript.event_handler("on_transcript_update") async def handle_update(processor, frame): print(f"New messages: {frame.messages}") ``` """ def __init__(self): """Initialize factory.""" self._user_processor = None self._assistant_processor = None self._event_handlers = {}
[docs] def user(self, **kwargs) -> UserTranscriptProcessor: """Get the user transcript processor. Args: **kwargs: Arguments specific to UserTranscriptProcessor """ if self._user_processor is None: self._user_processor = UserTranscriptProcessor(**kwargs) # Apply any registered event handlers for event_name, handler in self._event_handlers.items(): @self._user_processor.event_handler(event_name) async def user_handler(processor, frame): return await handler(processor, frame) return self._user_processor
[docs] def assistant(self, **kwargs) -> AssistantTranscriptProcessor: """Get the assistant transcript processor. Args: **kwargs: Arguments specific to AssistantTranscriptProcessor """ if self._assistant_processor is None: self._assistant_processor = AssistantTranscriptProcessor(**kwargs) # Apply any registered event handlers for event_name, handler in self._event_handlers.items(): @self._assistant_processor.event_handler(event_name) async def assistant_handler(processor, frame): return await handler(processor, frame) return self._assistant_processor
[docs] def event_handler(self, event_name: str): """Register event handler for both processors. Args: event_name: Name of event to handle Returns: Decorator function that registers handler with both processors """ def decorator(handler): self._event_handlers[event_name] = handler # Apply handler to existing processors if they exist if self._user_processor: @self._user_processor.event_handler(event_name) async def user_handler(processor, frame): return await handler(processor, frame) if self._assistant_processor: @self._assistant_processor.event_handler(event_name) async def assistant_handler(processor, frame): return await handler(processor, frame) return handler return decorator