Reputation: 570
What I am trying to achieve is the following.
In Amazon Connect, when I create a Flow with Start Streaming Data block, it starts streaming to a Kinesis Video Stream, and I can call a lambda function with the streams information.
Next, I want this lambda function to stream some audio into the Kinesis Video Stream, so the customer on the phone can hear it.
I have created the following python script, but I cannot hear the audio on the phone call. I have looked at the error logs of connect, and there are no errors that are occuring. A solution to fix this script, or any examples of a similar pipeline would be awesome.
import boto3
import botocore.exceptions
import types
from functools import lru_cache
import time
import io
import wave
class KinesisVideo(object):
_CONTROL_SERVICES = ('kinesisvideo', )
_DATA_SERVICES = ('kinesis-video-media', 'kinesis-video-archived-media')
def __init__(self, session=None):
self._session = session or boto3.Session()
self._methods = {}
for service in self._CONTROL_SERVICES + self._DATA_SERVICES:
prototype = self._get_client_for_service(service)
for method in prototype.meta.method_to_api_mapping.keys():
self._methods[method] = service
@lru_cache()
def _get_arn_for_stream_name(self, stream_name):
response = self.describe_stream(StreamName=stream_name)
return response['StreamInfo']['StreamARN']
@lru_cache()
def _get_endpoint_for_stream_method(self, stream_arn, method):
response = self.get_data_endpoint(StreamARN=stream_arn, APIName=method.upper())
return response['DataEndpoint']
@lru_cache()
def _get_client_for_service(self, service, endpoint_url=None):
client = self._session.client(service, endpoint_url=endpoint_url)
if service == 'kinesis-video-media':
client = self._patch_kinesis_video_media(client)
return client
@lru_cache()
def _get_client_by_arguments(self, method, stream_name=None, stream_arn=None):
service = self._methods[method]
if service not in self._DATA_SERVICES:
return self._get_client_for_service(service)
if not (bool(stream_name) ^ bool(stream_arn)):
raise botocore.exceptions.ParamValidationError(report=
'One of StreamName or StreamARN must be defined ' + \
'to determine service endpoint'
)
stream_arn = self._get_arn_for_stream_name(stream_name) if stream_name else stream_arn
endpoint_url = self._get_endpoint_for_stream_method(stream_arn, method)
return self._get_client_for_service(service, endpoint_url)
def __getattr__(self, method):
if method not in self._methods:
return getattr(super(), method)
kwarg_map = {'StreamName': 'stream_name', 'StreamARN': 'stream_arn'}
def _api_call(**kwargs):
filtered_kwargs = {kwarg_map[k]: v for k, v in kwargs.items() if k in kwarg_map}
client = self._get_client_by_arguments(method, **filtered_kwargs)
return getattr(client, method)(**kwargs)
return _api_call
@staticmethod
def _patch_kinesis_video_media(client):
client.meta.service_model._service_description['operations']['PutMedia'] = {
'name': 'PutMedia',
'http': {'method': 'POST', 'requestUri': '/putMedia'},
'input': {'shape': 'PutMediaInput'},
'output': {'shape': 'PutMediaOutput'},
'errors': [
{'shape': 'ResourceNotFoundException'},
{'shape': 'NotAuthorizedException'},
{'shape': 'InvalidEndpointException'},
{'shape': 'ClientLimitExceededException'},
{'shape': 'ConnectionLimitExceededException'},
{'shape': 'InvalidArgumentException'}
],
'authtype': 'v4-unsigned-body',
}
client.meta.service_model._shape_resolver._shape_map['PutMediaInput'] = {
'type': 'structure',
'required': ['FragmentTimecodeType', 'ProducerStartTimestamp'],
'members': {
'FragmentTimecodeType': {
'shape': 'FragmentTimecodeType',
'location': 'header',
'locationName': 'x-amzn-fragment-timecode-type',
},
'ProducerStartTimestamp': {
'shape': 'Timestamp',
'location': 'header',
'locationName': 'x-amzn-producer-start-timestamp',
},
'StreamARN': {
'shape': 'ResourceARN',
'location': 'header',
'locationName': 'x-amzn-stream-arn',
},
'StreamName': {
'shape': 'StreamName',
'location': 'header',
'locationName': 'x-amzn-stream-name',
},
'Payload': {
'shape': 'Payload',
},
},
'payload': 'Payload',
}
client.meta.service_model._shape_resolver._shape_map['PutMediaOutput'] = {
'type': 'structure',
'members': {'Payload': {'shape': 'Payload'}},
'payload': 'Payload',
}
client.meta.service_model._shape_resolver._shape_map['FragmentTimecodeType'] = {
'type': 'string',
'enum': ['ABSOLUTE', 'RELATIVE'],
}
client.put_media = types.MethodType(
lambda self, **kwargs: self._make_api_call('PutMedia', kwargs),
client,
)
client.meta.method_to_api_mapping['put_media'] = 'PutMedia'
return client
def main():
session = boto3.Session(
aws_access_key_id='MY_ACCESS_KEY_ID',
aws_secret_access_key='MY_SECRET_ACCESS_KEY',
region_name='MY_REGION'
)
video = KinesisVideo(session=session)
print(video.list_streams())
start_tmstp = repr(time.time()).split('.')[0]
print(start_tmstp)
# Ensure the audio data is correctly handled for Amazon Connect
with open('packages/python/output.mkv', 'rb') as payload_file:
payload_data = payload_file.read()
# Create a PCM file from the audio data
pcm_data = io.BytesIO(payload_data)
with wave.open(pcm_data, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 2 bytes per sample
wav_file.setframerate(8000) # Sample rate
wav_file.writeframes(payload_data)
response = video.put_media(
StreamName='MY_STREAM_NAME',
Payload=pcm_data,
FragmentTimecodeType='RELATIVE',
ProducerStartTimestamp=start_tmstp,
)
import json
for event in map(json.loads, response['Payload'].read().decode('utf-8').splitlines()):
print(event)
if __name__ == '__main__':
main()
Upvotes: 0
Views: 291
Reputation: 474
This isn't currently possible. There is no way to stream audio into Connect.
Upvotes: 0