Source code for pipecat.utils.text.simple_text_aggregator

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import Optional

from pipecat.utils.string import match_endofsentence
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator


[docs] class SimpleTextAggregator(BaseTextAggregator): """This is a simple text aggregator. It aggregates text until an end of sentence is found. """ def __init__(self): self._text = "" @property def text(self) -> str: return self._text
[docs] async def aggregate(self, text: str) -> Optional[str]: result: Optional[str] = None self._text += text eos_end_marker = match_endofsentence(self._text) if eos_end_marker: result = self._text[:eos_end_marker] self._text = self._text[eos_end_marker:] return result
[docs] async def handle_interruption(self): self._text = ""
[docs] async def reset(self): self._text = ""