Utils

pipecat.services.aws.utils.get_presigned_url(*, region, credentials, language_code, media_encoding='pcm', sample_rate=16000, number_of_channels=1, enable_partial_results_stabilization=True, partial_results_stability='high', vocabulary_name=None, vocabulary_filter_name=None, show_speaker_label=False, enable_channel_identification=False)[source]

Create a presigned URL for AWS Transcribe streaming.

Parameters:
  • region (str)

  • credentials (Dict[str, str | None])

  • language_code (str)

  • media_encoding (str)

  • sample_rate (int)

  • number_of_channels (int)

  • enable_partial_results_stabilization (bool)

  • partial_results_stability (str)

  • vocabulary_name (str | None)

  • vocabulary_filter_name (str | None)

  • show_speaker_label (bool)

  • enable_channel_identification (bool)

Return type:

str

class pipecat.services.aws.utils.AWSTranscribePresignedURL(access_key, secret_key, session_token, region='us-east-1')[source]

Bases: object

Parameters:
  • access_key (str)

  • secret_key (str)

  • session_token (str)

  • region (str)

get_request_url(sample_rate, language_code='', media_encoding='pcm', vocabulary_name='', vocabulary_filter_name='', show_speaker_label=False, enable_channel_identification=False, number_of_channels=1, enable_partial_results_stabilization=False, partial_results_stability='')[source]
Parameters:
  • sample_rate (int)

  • language_code (str)

  • media_encoding (str)

  • vocabulary_name (str)

  • vocabulary_filter_name (str)

  • show_speaker_label (bool)

  • enable_channel_identification (bool)

  • number_of_channels (int)

  • enable_partial_results_stabilization (bool)

  • partial_results_stability (str)

Return type:

str

pipecat.services.aws.utils.get_headers(header_name, header_value)[source]

Build a header following AWS event stream format.

Parameters:
  • header_name (str)

  • header_value (str)

Return type:

bytearray

pipecat.services.aws.utils.build_event_message(payload)[source]

Build an event message for AWS Transcribe streaming. Matches AWS sample: https://github.com/aws-samples/amazon-transcribe-streaming-python-websockets/blob/main/eventstream.py

Parameters:

payload (bytes)

Return type:

bytes

pipecat.services.aws.utils.decode_event(message)[source]