Context
- class pipecat.services.aws_nova_sonic.context.Role(*values)[source]
Bases:
Enum
- SYSTEM = 'SYSTEM'
- USER = 'USER'
- ASSISTANT = 'ASSISTANT'
- TOOL = 'TOOL'
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistoryMessage(role: pipecat.services.aws_nova_sonic.context.Role, text: str)[source]
Bases:
object
- Parameters:
role (Role)
text (str)
- role: Role
- text: str
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistory(system_instruction: str = None, messages: list[pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistoryMessage] = <factory>)[source]
Bases:
object
- Parameters:
system_instruction (str)
messages (list[AWSNovaSonicConversationHistoryMessage])
- system_instruction: str = None
- messages: list[AWSNovaSonicConversationHistoryMessage]
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicLLMContext(messages=None, tools=None, **kwargs)[source]
Bases:
OpenAILLMContext
- static upgrade_to_nova_sonic(obj, system_instruction)[source]
- Parameters:
obj (OpenAILLMContext)
system_instruction (str)
- Return type:
AWSNovaSonicLLMContext
- get_messages_for_initializing_history()[source]
- Return type:
AWSNovaSonicConversationHistory
- get_messages_for_persistent_storage()[source]
- from_standard_message(message)[source]
Convert from OpenAI message format to OpenAI message format (passthrough).
OpenAI’s format allows both simple string content and structured content: - Simple: {“role”: “user”, “content”: “Hello”} - Structured: {“role”: “user”, “content”: [{“type”: “text”, “text”: “Hello”}]}
Since OpenAI is our standard format, this is a passthrough function.
- Parameters:
message (dict) – Message in OpenAI format
- Returns:
Same message, unchanged
- Return type:
dict
- buffer_user_text(text)[source]
- flush_aggregated_user_text()[source]
- Return type:
str
- buffer_assistant_text(text)[source]
- flush_aggregated_assistant_text()[source]
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicMessagesUpdateFrame(context: pipecat.services.aws_nova_sonic.context.AWSNovaSonicLLMContext)[source]
Bases:
DataFrame
- Parameters:
context (AWSNovaSonicLLMContext)
- context: AWSNovaSonicLLMContext
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicUserContextAggregator(context, *, params=None, **kwargs)[source]
Bases:
OpenAIUserContextAggregator
- Parameters:
context (OpenAILLMContext)
params (LLMUserAggregatorParams | None)
- async process_frame(frame, direction=FrameDirection.DOWNSTREAM)[source]
- Parameters:
frame (Frame)
direction (FrameDirection)
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicAssistantContextAggregator(context, *, params=None, **kwargs)[source]
Bases:
OpenAIAssistantContextAggregator
- Parameters:
context (OpenAILLMContext)
params (LLMAssistantAggregatorParams | None)
- async process_frame(frame, direction)[source]
- Parameters:
frame (Frame)
direction (FrameDirection)
- async handle_function_call_result(frame)[source]
Handle the result of a function call.
Updates the context with the function call result, replacing any previous IN_PROGRESS status.
- Parameters:
frame (FunctionCallResultFrame) – Frame containing the function call result.
- class pipecat.services.aws_nova_sonic.context.AWSNovaSonicContextAggregatorPair(_user: pipecat.services.aws_nova_sonic.context.AWSNovaSonicUserContextAggregator, _assistant: pipecat.services.aws_nova_sonic.context.AWSNovaSonicAssistantContextAggregator)[source]
Bases:
object
- Parameters:
_user (AWSNovaSonicUserContextAggregator)
_assistant (AWSNovaSonicAssistantContextAggregator)
- user()[source]
- Return type:
AWSNovaSonicUserContextAggregator
- assistant()[source]
- Return type:
AWSNovaSonicAssistantContextAggregator