Source code for pipecat.utils.text.pattern_pair_aggregator

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import re
from typing import Awaitable, Callable, Optional, Tuple

from loguru import logger

from pipecat.utils.string import match_endofsentence
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator


[docs] class PatternMatch: """Represents a matched pattern pair with its content. A PatternMatch object is created when a complete pattern pair is found in the text. It contains information about which pattern was matched, the full matched text (including start and end patterns), and the content between the patterns. Attributes: pattern_id: The identifier of the matched pattern pair. full_match: The complete text including start and end patterns. content: The text content between the start and end patterns. """ def __init__(self, pattern_id: str, full_match: str, content: str): """Initialize a pattern match. Args: pattern_id: ID of the pattern pair. full_match: Complete matched text including start and end patterns. content: Content between the start and end patterns. """ self.pattern_id = pattern_id self.full_match = full_match self.content = content def __str__(self) -> str: """Return a string representation of the pattern match. Returns: A string describing the pattern match. """ return f"PatternMatch(id={self.pattern_id}, content={self.content})"
[docs] class PatternPairAggregator(BaseTextAggregator): """Aggregator that identifies and processes content between pattern pairs. This aggregator buffers text until it can identify complete pattern pairs (defined by start and end patterns), processes the content between these patterns using registered handlers, and returns text at sentence boundaries. It's particularly useful for processing structured content in streaming text, such as XML tags, markdown formatting, or custom delimiters. The aggregator ensures that patterns spanning multiple text chunks are correctly identified and handles cases where patterns contain sentence boundaries. """ def __init__(self): """Initialize the pattern pair aggregator. Creates an empty aggregator with no patterns or handlers registered. """ self._text = "" self._patterns = {} self._handlers = {} @property def text(self) -> str: """Get the currently buffered text. Returns: The current text buffer content. """ return self._text
[docs] def add_pattern_pair( self, pattern_id: str, start_pattern: str, end_pattern: str, remove_match: bool = True ) -> "PatternPairAggregator": """Add a pattern pair to detect in the text. Registers a new pattern pair with a unique identifier. The aggregator will look for text that starts with the start pattern and ends with the end pattern, and treat the content between them as a match. Args: pattern_id: Unique identifier for this pattern pair. start_pattern: Pattern that marks the beginning of content. end_pattern: Pattern that marks the end of content. remove_match: Whether to remove the matched content from the text. Returns: Self for method chaining. """ self._patterns[pattern_id] = { "start": start_pattern, "end": end_pattern, "remove_match": remove_match, } return self
[docs] def on_pattern_match( self, pattern_id: str, handler: Callable[[PatternMatch], Awaitable[None]] ) -> "PatternPairAggregator": """Register a handler for when a pattern pair is matched. The handler will be called whenever a complete match for the specified pattern ID is found in the text. Args: pattern_id: ID of the pattern pair to match. handler: Function to call when pattern is matched. The function should accept a PatternMatch object. Returns: Self for method chaining. """ self._handlers[pattern_id] = handler return self
async def _process_complete_patterns(self, text: str) -> Tuple[str, bool]: """Process all complete pattern pairs in the text. Searches for all complete pattern pairs in the text, calls the appropriate handlers, and optionally removes the matches. Args: text: The text to process. Returns: Tuple of (processed_text, was_modified) where: - processed_text is the text after processing patterns - was_modified indicates whether any changes were made """ processed_text = text modified = False for pattern_id, pattern_info in self._patterns.items(): # Escape special regex characters in the patterns start = re.escape(pattern_info["start"]) end = re.escape(pattern_info["end"]) remove_match = pattern_info["remove_match"] # Create regex to match from start pattern to end pattern # The .*? is non-greedy to handle nested patterns regex = f"{start}(.*?){end}" # Find all matches match_iter = re.finditer(regex, processed_text, re.DOTALL) matches = list(match_iter) # Convert to list for safe iteration for match in matches: content = match.group(1) # Content between patterns full_match = match.group(0) # Full match including patterns # Create pattern match object pattern_match = PatternMatch( pattern_id=pattern_id, full_match=full_match, content=content ) # Call the appropriate handler if registered if pattern_id in self._handlers: try: await self._handlers[pattern_id](pattern_match) except Exception as e: logger.error(f"Error in pattern handler for {pattern_id}: {e}") # Remove the pattern from the text if configured if remove_match: processed_text = processed_text.replace(full_match, "", 1) modified = True return processed_text, modified def _has_incomplete_patterns(self, text: str) -> bool: """Check if text contains incomplete pattern pairs. Determines whether the text contains any start patterns without matching end patterns, which would indicate incomplete content. Args: text: The text to check. Returns: True if there are incomplete patterns, False otherwise. """ for pattern_id, pattern_info in self._patterns.items(): start = pattern_info["start"] end = pattern_info["end"] # Count occurrences start_count = text.count(start) end_count = text.count(end) # If there are more starts than ends, we have incomplete patterns if start_count > end_count: return True return False
[docs] async def aggregate(self, text: str) -> Optional[str]: """Aggregate text and process pattern pairs. This method adds the new text to the buffer, processes any complete pattern pairs, and returns processed text up to sentence boundaries if possible. If there are incomplete patterns (start without matching end), it will continue buffering text. Args: text: New text to add to the buffer. Returns: Processed text up to a sentence boundary, or None if more text is needed to form a complete sentence or pattern. """ # Add new text to buffer self._text += text # Process any complete patterns in the buffer processed_text, modified = await self._process_complete_patterns(self._text) # Only update the buffer if modifications were made if modified: self._text = processed_text # Check if we have incomplete patterns if self._has_incomplete_patterns(self._text): # Still waiting for complete patterns return None # Find sentence boundary if no incomplete patterns eos_marker = match_endofsentence(self._text) if eos_marker: # Extract text up to the sentence boundary result = self._text[:eos_marker] self._text = self._text[eos_marker:] return result # No complete sentence found yet return None
[docs] async def handle_interruption(self): """Handle interruptions by clearing the buffer. Called when an interruption occurs in the processing pipeline, to reset the state and discard any partially aggregated text. """ self._text = ""
[docs] async def reset(self): """Clear the internally aggregated text. Resets the aggregator to its initial state, discarding any buffered text. """ self._text = ""