Reputation: 272
I am trying to send data to a stream and then use kinesis firehouse to deliver the data to ElasticSearch, I am using python lambda function to convert the data to JSON before pushing, but the lambda is failing with below error.
[ERROR] KeyError: 'Records'
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 7, in lambda_handler
for record in event["Records"]:
I am able to see the Records in the shard using sharditerator like below.
{
"Records": [
{
"SequenceNumber": "49599580114447666780699883212202628138922281244234350610",
"ApproximateArrivalTimestamp": 1568741427.415,
"Data": "MjAwNi8wMS8wMSAwMDowMDowMHwzMTA4IE9DQ0lERU5UQUwgRFJ8M3wzQyAgICAgICAgfDExMTV8MTA4NTEoQSlWQyBUQUtFIFZFSCBXL08gT1dORVJ8MjQwNHwzOC41NTA0MjA0N3wtMTIxLjM5MTQxNTh8MjAxOS8wOS8xNyAyMzowMDoyNA==",
"PartitionKey": "1"
},
I am using below lambda function to process the stream.
import json
print("Loading the function")
success = 0
failure = 0
def lambda_handler(event, context):
for record in event["Records"]:
print(record)
payload=base64.b64decode(record["Data"]).decode('utf-8')
match = payload.split('|')
result = {}
if match:
# create a dict object of the row
#build all fields from array
result["crime_time"] = match[0]
result["address"] = match[1]
result['district'] = int(match[2])
result['beat'] = match[3]
result['grid'] =int(match[4])
result['description'] = match[5]
result['crime_id'] = int(match[6])
result['latitude'] = float(match[7])
result['longitude'] = float(match[8])
result['load_time'] = match[9]
result['location'] = {
'lat' : float(match[7]),
'lon' : float(match[8])
}
success+=1
return {
'statusCode': 200,
'body': json.dumps(result)
}
But I am getting error in lambda function after sending data to the stream.
Upvotes: 0
Views: 1179
Reputation: 181
What is the error and did you check cloudwatch logs to see what is happening? I feel as this will give you a good indicator of what is going on.
Upvotes: 1