Context

class pipecat.services.aws_nova_sonic.context.Role(*values)[source]

Bases: Enum

SYSTEM = 'SYSTEM'
USER = 'USER'
ASSISTANT = 'ASSISTANT'
TOOL = 'TOOL'
class pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistoryMessage(role: pipecat.services.aws_nova_sonic.context.Role, text: str)[source]

Bases: object

Parameters:
  • role (Role)

  • text (str)

role: Role
text: str
class pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistory(system_instruction: str = None, messages: list[pipecat.services.aws_nova_sonic.context.AWSNovaSonicConversationHistoryMessage] = <factory>)[source]

Bases: object

Parameters:
  • system_instruction (str)

  • messages (list[AWSNovaSonicConversationHistoryMessage])

system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage]
class pipecat.services.aws_nova_sonic.context.AWSNovaSonicLLMContext(messages=None, tools=None, **kwargs)[source]

Bases: OpenAILLMContext

static upgrade_to_nova_sonic(obj, system_instruction)[source]
Parameters:
  • obj (OpenAILLMContext)

  • system_instruction (str)

Return type:

AWSNovaSonicLLMContext

get_messages_for_initializing_history()[source]
Return type:

AWSNovaSonicConversationHistory

get_messages_for_persistent_storage()[source]
from_standard_message(message)[source]

Convert from OpenAI message format to OpenAI message format (passthrough).

OpenAI’s format allows both simple string content and structured content: - Simple: {“role”: “user”, “content”: “Hello”} - Structured: {“role”: “user”, “content”: [{“type”: “text”, “text”: “Hello”}]}

Since OpenAI is our standard format, this is a passthrough function.

Parameters:

message (dict) – Message in OpenAI format

Returns:

Same message, unchanged

Return type:

dict

buffer_user_text(text)[source]
flush_aggregated_user_text()[source]
Return type:

str

buffer_assistant_text(text)[source]
flush_aggregated_assistant_text()[source]
class pipecat.services.aws_nova_sonic.context.AWSNovaSonicMessagesUpdateFrame(context: pipecat.services.aws_nova_sonic.context.AWSNovaSonicLLMContext)[source]

Bases: DataFrame

Parameters:

context (AWSNovaSonicLLMContext)

context: AWSNovaSonicLLMContext
class pipecat.services.aws_nova_sonic.context.AWSNovaSonicUserContextAggregator(context, *, params=None, **kwargs)[source]

Bases: OpenAIUserContextAggregator

Parameters:
  • context (OpenAILLMContext)

  • params (LLMUserAggregatorParams | None)

async process_frame(frame, direction=FrameDirection.DOWNSTREAM)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

class pipecat.services.aws_nova_sonic.context.AWSNovaSonicAssistantContextAggregator(context, *, params=None, **kwargs)[source]

Bases: OpenAIAssistantContextAggregator

Parameters:
  • context (OpenAILLMContext)

  • params (LLMAssistantAggregatorParams | None)

async process_frame(frame, direction)[source]
Parameters:
  • frame (Frame)

  • direction (FrameDirection)

async handle_function_call_result(frame)[source]

Handle the result of a function call.

Updates the context with the function call result, replacing any previous IN_PROGRESS status.

Parameters:

frame (FunctionCallResultFrame) – Frame containing the function call result.

class pipecat.services.aws_nova_sonic.context.AWSNovaSonicContextAggregatorPair(_user: pipecat.services.aws_nova_sonic.context.AWSNovaSonicUserContextAggregator, _assistant: pipecat.services.aws_nova_sonic.context.AWSNovaSonicAssistantContextAggregator)[source]

Bases: object

Parameters:
  • _user (AWSNovaSonicUserContextAggregator)

  • _assistant (AWSNovaSonicAssistantContextAggregator)

user()[source]
Return type:

AWSNovaSonicUserContextAggregator

assistant()[source]
Return type:

AWSNovaSonicAssistantContextAggregator