south153
south153

Reputation: 57

Lambda writing empty records to s3

I am trying to write from firehose to s3 using the test event

{ 
  "TICKER_SYMBOL": "QXZ",
  "SECTOR": "HEALTHCARE",
   "CHANGE": -0.05,
   "PRICE": 84.51
}

My lambda code is

import json
import base64
def lambda_handler(event, context):
    print(event)
    for record in event['records']:
       #Kinesis data is base64 encoded so decode here
        payload=base64.b64decode(record["data"])
        print("Decoded payload: " + str(payload))
        json_object = {}
        output = []
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')}
        output.append(output_record)
        print(output)
    return {'records': output}

The code prints the expected output and kinesis is writing to the successful s3 folder but when I download the file from s3 it is just an empty {}. What is causing the empty {} in s3?

Upvotes: 0

Views: 271

Answers (1)

Andrew Nguonly
Andrew Nguonly

Reputation: 2621

What is causing the empty {} in s3?

The Lambda function is serializing json_object, which is an empty dictionary {}. If you want to serialize/return the original payload with no transformations, serialize/return the original, undecoded data:

output_record = {
    'recordId': record['recordId'],
    'result': 'Ok',
    'data': record['data']
}
output.append(output_record)

Kinesis Firehose can be configured to sink to S3 without configuring Data Transformation with Lambda. If you don't require any data transformations, my suggestion is to exclude the Data Transformation configuration.

If you do require data transformations, make sure to...

  1. Perform transformations on payload
  2. Fix bug: Move output = [] outside of for loop
  3. Serialize/return transformed payload
output = []

for record in event['records']:
    payload = base64.b64decode(record["data"])
    payload_json = json.loads(payload)   

    # TODO: do transformations on payload_json
    transformed_payload = json.dumps(payload_json)
     
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(transformed_payload.encode('utf-8')).decode('utf-8')
    }
    output.append(output_record)

return {'records': output}

Upvotes: 1

Related Questions