Reputation: 1009
I have the following lambda function as part of Kinesis firehose record transformation which transforms msgpack record from the kinesis input stream to json.
Lambda Runtime: python 3.6
from __future__ import print_function
import base64
import msgpack
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)
# Do custom processing on the payload here
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': json.dumps(payload, ensure_ascii=False).encode('utf8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
But lambda throwing the following error:
An error occurred during JSON serialization of response: b'
{
"id": "d23fd47f-3a62-4383-bcb3-abdb913ea572",
"timestamp": 1526358140730,
"message": "Hello World"
}
' is not JSON serializable
Traceback (most recent call last):
File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/var/runtime/awslambda/bootstrap.py", line 110, in
decimal_serializer
raise TypeError(repr(o) + " is not JSON serializable")
Am I doing anything wrong?
Upvotes: 7
Views: 12203
Reputation: 1009
I was able to fix the issue.
Here is the code which worked for me.
from __future__ import print_function
import base64
import msgpack
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)
# Do custom processing on the payload here
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload).encode('utf-8') + b'\n').decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
Upvotes: 10
Reputation: 1166
I have managed to fix it this way:
import base64
import gzip
import io
import json
import zlib
def cloudwatch_handler(event, context):
output = []
for record in event['records']:
compressed_payload = base64.b64decode(record['data'])
uncompressed_payload = gzip.decompress(compressed_payload)
print('uncompressed_payload',uncompressed_payload)
payload = json.loads(uncompressed_payload)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
This solution doesn't require you to include modules from outside python like msgpack
Upvotes: 1
Reputation: 351
I have a go version lambda function doing the transforming job.
https://github.com/hixichen/golang_lamda_decode_protobuf_firehose
Upvotes: 0