Andy_ye
Andy_ye

Reputation: 570

Streaming audio to Amazon Connect through Kinesis Video Streams

What I am trying to achieve is the following.

In Amazon Connect, when I create a Flow with Start Streaming Data block, it starts streaming to a Kinesis Video Stream, and I can call a lambda function with the streams information.

Next, I want this lambda function to stream some audio into the Kinesis Video Stream, so the customer on the phone can hear it.

I have created the following python script, but I cannot hear the audio on the phone call. I have looked at the error logs of connect, and there are no errors that are occuring. A solution to fix this script, or any examples of a similar pipeline would be awesome.

import boto3
import botocore.exceptions
import types
from functools import lru_cache
import time
import io
import wave

class KinesisVideo(object):
    _CONTROL_SERVICES = ('kinesisvideo', )
    _DATA_SERVICES = ('kinesis-video-media', 'kinesis-video-archived-media')

    def __init__(self, session=None):
        self._session = session or boto3.Session()
        self._methods = {}
        for service in self._CONTROL_SERVICES + self._DATA_SERVICES:
            prototype = self._get_client_for_service(service)
            for method in prototype.meta.method_to_api_mapping.keys():
                self._methods[method] = service

    @lru_cache()
    def _get_arn_for_stream_name(self, stream_name):
        response = self.describe_stream(StreamName=stream_name)
        return response['StreamInfo']['StreamARN']

    @lru_cache()
    def _get_endpoint_for_stream_method(self, stream_arn, method):
        response = self.get_data_endpoint(StreamARN=stream_arn, APIName=method.upper())
        return response['DataEndpoint']

    @lru_cache()
    def _get_client_for_service(self, service, endpoint_url=None):
        client = self._session.client(service, endpoint_url=endpoint_url)
        if service == 'kinesis-video-media':
            client = self._patch_kinesis_video_media(client)
        return client

    @lru_cache()
    def _get_client_by_arguments(self, method, stream_name=None, stream_arn=None):
        service = self._methods[method]
        if service not in self._DATA_SERVICES:
            return self._get_client_for_service(service)
        if not (bool(stream_name) ^ bool(stream_arn)):
            raise botocore.exceptions.ParamValidationError(report=
                'One of StreamName or StreamARN must be defined ' + \
                'to determine service endpoint'
            )
        stream_arn = self._get_arn_for_stream_name(stream_name) if stream_name else stream_arn
        endpoint_url = self._get_endpoint_for_stream_method(stream_arn, method)
        return self._get_client_for_service(service, endpoint_url)

    def __getattr__(self, method):
        if method not in self._methods:
            return getattr(super(), method)
        kwarg_map = {'StreamName': 'stream_name', 'StreamARN': 'stream_arn'}
        def _api_call(**kwargs):
            filtered_kwargs = {kwarg_map[k]: v for k, v in kwargs.items() if k in kwarg_map}
            client = self._get_client_by_arguments(method, **filtered_kwargs)
            return getattr(client, method)(**kwargs)
        return _api_call

    @staticmethod
    def _patch_kinesis_video_media(client):
        client.meta.service_model._service_description['operations']['PutMedia'] = {
            'name': 'PutMedia',
            'http': {'method': 'POST', 'requestUri': '/putMedia'},
            'input': {'shape': 'PutMediaInput'},
            'output': {'shape': 'PutMediaOutput'},
            'errors': [
                {'shape': 'ResourceNotFoundException'},
                {'shape': 'NotAuthorizedException'},
                {'shape': 'InvalidEndpointException'},
                {'shape': 'ClientLimitExceededException'},
                {'shape': 'ConnectionLimitExceededException'},
                {'shape': 'InvalidArgumentException'}
            ],
            'authtype': 'v4-unsigned-body',
        }
        client.meta.service_model._shape_resolver._shape_map['PutMediaInput'] = {
            'type': 'structure',
            'required': ['FragmentTimecodeType', 'ProducerStartTimestamp'],
            'members': {
                'FragmentTimecodeType': {
                    'shape': 'FragmentTimecodeType',
                    'location': 'header',
                    'locationName': 'x-amzn-fragment-timecode-type',
                },
                'ProducerStartTimestamp': {
                    'shape': 'Timestamp',
                    'location': 'header',
                    'locationName': 'x-amzn-producer-start-timestamp',
                },
                'StreamARN': {
                    'shape': 'ResourceARN',
                    'location': 'header',
                    'locationName': 'x-amzn-stream-arn',
                },
                'StreamName': {
                    'shape': 'StreamName',
                    'location': 'header',
                    'locationName': 'x-amzn-stream-name',
                },
                'Payload': {
                    'shape': 'Payload',
                },
            },
            'payload': 'Payload',
        }
        client.meta.service_model._shape_resolver._shape_map['PutMediaOutput'] = {
            'type': 'structure',
            'members': {'Payload': {'shape': 'Payload'}},
            'payload': 'Payload',
        }
        client.meta.service_model._shape_resolver._shape_map['FragmentTimecodeType'] = {
            'type': 'string',
            'enum': ['ABSOLUTE', 'RELATIVE'],
        }
        client.put_media = types.MethodType(
            lambda self, **kwargs: self._make_api_call('PutMedia', kwargs),
            client,
        )
        client.meta.method_to_api_mapping['put_media'] = 'PutMedia'
        return client

def main():
    session = boto3.Session(
        aws_access_key_id='MY_ACCESS_KEY_ID',
        aws_secret_access_key='MY_SECRET_ACCESS_KEY',
        region_name='MY_REGION'
    )

    video = KinesisVideo(session=session)

    print(video.list_streams())
    
    start_tmstp = repr(time.time()).split('.')[0]

    print(start_tmstp)

    # Ensure the audio data is correctly handled for Amazon Connect
    with open('packages/python/output.mkv', 'rb') as payload_file:
        payload_data = payload_file.read()

    # Create a PCM file from the audio data
    pcm_data = io.BytesIO(payload_data)
    with wave.open(pcm_data, 'wb') as wav_file:
        wav_file.setnchannels(1)  # Mono
        wav_file.setsampwidth(2)  # 2 bytes per sample
        wav_file.setframerate(8000)  # Sample rate
        wav_file.writeframes(payload_data)

    response = video.put_media(
        StreamName='MY_STREAM_NAME',
        Payload=pcm_data,
        FragmentTimecodeType='RELATIVE',
        ProducerStartTimestamp=start_tmstp,
    )

    import json
    for event in map(json.loads, response['Payload'].read().decode('utf-8').splitlines()):
        print(event)

if __name__ == '__main__':
    main()

Upvotes: 0

Views: 291

Answers (1)

ledge
ledge

Reputation: 474

This isn't currently possible. There is no way to stream audio into Connect.

Upvotes: 0

Related Questions