#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional, Union
from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
try:
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.exception("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
raise Exception(f"Missing module: {e}")
[docs]
class LangchainProcessor(FrameProcessor):
def __init__(self, chain: Runnable, transcript_key: str = "input"):
super().__init__()
self._chain = chain
self._transcript_key = transcript_key
self._participant_id: Optional[str] = None
[docs]
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMMessagesFrame):
# Messages are accumulated on the context as a list of messages.
# The last one by the human is the one we want to send to the LLM.
logger.debug(f"Got transcription frame {frame}")
text: str = frame.messages[-1]["content"]
await self._ainvoke(text.strip())
else:
await self.push_frame(frame, direction)
@staticmethod
def __get_token_value(text: Union[str, AIMessageChunk]) -> str:
match text:
case str():
return text
case AIMessageChunk():
return text.content
case _:
return ""
async def _ainvoke(self, text: str):
logger.debug(f"Invoking chain with {text}")
await self.push_frame(LLMFullResponseStartFrame())
try:
async for token in self._chain.astream(
{self._transcript_key: text},
config={"configurable": {"session_id": self._participant_id}},
):
await self.push_frame(TextFrame(self.__get_token_value(token)))
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())