Reputation: 449
I am using below Lambda (python) code to decode cloudwatch metric stream & then creating dynamic partition based on metric_name field in the stream data. But as i have written this in code, if a stream file has 3 types of metric_name data, its picking only 1st metric name & creating partition based on that & putting all 3 metric_name in same partition. This is not intended output. Can anyone help with this code?
I have got idea from AWS documentation about this code & I changed it based on my stream data.(Reference: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html)
Code:
from __future__ import print_function
import base64
import json
import datetime
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
+ ", Region: " + firehose_records_input['region']
+ ", and InvocationId: " + firehose_records_input['invocationId'])
# Create return value.
firehose_records_output = {'records': []}
# Create result object.
# Go through records and process them
for firehose_record_input in firehose_records_input['records']:
# Get user payload
payload = base64.b64decode(firehose_record_input['data']).decode('utf-8')
metrics = list(filter(None, payload.split("\n")))
if len(metrics) > 1:
print("Length of array = {}".format(len(metrics)))
new_payload = []
for metric in metrics:
new_payload.append(json.loads(metric))
json_value = new_payload
print("Record after processing")
print(json_value)
# exit()
print("\n")
# Create output Firehose record and add modified payload and record ID to it.
firehose_record_output = {}
#working partition_keys
partition_keys = {"metric_name": json_value[0]['metric_name'] }
# Create output Firehose record and add modified payload and record ID to it.
firehose_record_output = {'recordId': firehose_record_input['recordId'],
'data': firehose_record_input['data'],
'result': 'Ok',
'metadata': { 'partitionKeys': partition_keys }}
# Must set proper record ID
# Add the record to the list of output records.
firehose_records_output['records'].append(firehose_record_output)
# At the end return processed records
print('**************firehose_records_output**********************')
print(firehose_records_output)
print('****************end********************')
return firehose_records_output
Stream data sample:
{
"metric_stream_name": "MyMetricStream",
"account_id": "12345678",
"region": "us-east-1",
"namespace": "AWS/EC2",
"metric_name": "DiskWriteOps",
"dimensions": {
"InstanceId": "i1234"
},
"timestamp": 1611929698000,
"value": {
"count": 3.0,
"sum": 20.0,
"max": 18.0,
"min": 0.0
},
"unit": "Seconds"
},
{
"metric_stream_name": "MyMetricStream",
"account_id": "12345678",
"region": "us-east-1",
"namespace": "AWS/EC2",
"metric_name": "DiskReadIOps",
"dimensions": {
"InstanceId": "i1234"
},
"timestamp": 1611929698000,
"value": {
"count": 3.0,
"sum": 20.0,
"max": 18.0,
"min": 0.0
},
"unit": "Seconds"
},
{
"metric_stream_name": "MyMetricStream",
"account_id": "12345678",
"region": "us-east-1",
"namespace": "AWS/EC2",
"metric_name": "CPUUtilization",
"dimensions": {
"InstanceId": "i1234"
},
"timestamp": 1611929698000,
"value": {
"count": 3.0,
"sum": 20.0,
"max": 18.0,
"min": 0.0
},
"unit": "Seconds"
}
As per above data 3 partition should be created, but its creating 1 partition using 1st metric_name(DiskWriteOps) & pushing all metric inside this (because of this I think partition_keys = {"metric_name": json_value[0]['metric_name'] }
)
Upvotes: 0
Views: 671
Reputation: 20860
If you are passing that record to firehose, it will be treated as one single json record and hence you see only one partition due to json_value[0]['metric_name']
value
In this case, you need to perform some pre-processing via lambda functions or Kinesis analytics to split that json into 3 separate records and then use dynamic partitioning to use the metric_name to create partitions with right name.
Dynamic partition code will look like this:
partition_keys = {"metric_name": json_value['metric_name'] }
Upvotes: 1