dbNovice
dbNovice

Reputation: 449

Python code for AWS Cloudwatch metrics & Kinesis firehose dynamic partitioning

I am using below Lambda (python) code to decode cloudwatch metric stream & then creating dynamic partition based on metric_name field in the stream data. But as i have written this in code, if a stream file has 3 types of metric_name data, its picking only 1st metric name & creating partition based on that & putting all 3 metric_name in same partition. This is not intended output. Can anyone help with this code?

I have got idea from AWS documentation about this code & I changed it based on my stream data.(Reference: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html)

Code:

from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data']).decode('utf-8')
        
        metrics = list(filter(None, payload.split("\n")))
        

        if len(metrics) > 1:
            print("Length of array = {}".format(len(metrics)))
            new_payload = []
            for metric in metrics:
                new_payload.append(json.loads(metric))
            json_value = new_payload 
            print("Record after processing")
            print(json_value)
            # exit()
            print("\n")
            # Create output Firehose record and add modified payload and record ID to it.
            firehose_record_output = {}
            
            #working partition_keys
            
            partition_keys = {"metric_name": json_value[0]['metric_name'] }
            
            # Create output Firehose record and add modified payload and record ID to it.
            firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                      'data': firehose_record_input['data'],
                                      'result': 'Ok',
                                      'metadata': { 'partitionKeys': partition_keys }}
     
            # Must set proper record ID
            # Add the record to the list of output records.
     
            firehose_records_output['records'].append(firehose_record_output)
     
    # At the end return processed records
    print('**************firehose_records_output**********************')
    print(firehose_records_output)
    print('****************end********************')
    
    return firehose_records_output

Stream data sample:

{
    "metric_stream_name": "MyMetricStream",
    "account_id": "12345678",
    "region": "us-east-1",
    "namespace": "AWS/EC2",
    "metric_name": "DiskWriteOps",
    "dimensions": {
        "InstanceId": "i1234"
    },
    "timestamp": 1611929698000,
    "value": {
        "count": 3.0,
        "sum": 20.0,
        "max": 18.0,
        "min": 0.0
    },
    "unit": "Seconds"
},
{
    "metric_stream_name": "MyMetricStream",
    "account_id": "12345678",
    "region": "us-east-1",
    "namespace": "AWS/EC2",
    "metric_name": "DiskReadIOps",
    "dimensions": {
        "InstanceId": "i1234"
    },
    "timestamp": 1611929698000,
    "value": {
        "count": 3.0,
        "sum": 20.0,
        "max": 18.0,
        "min": 0.0
    },
    "unit": "Seconds"
},
{
    "metric_stream_name": "MyMetricStream",
    "account_id": "12345678",
    "region": "us-east-1",
    "namespace": "AWS/EC2",
    "metric_name": "CPUUtilization",
    "dimensions": {
        "InstanceId": "i1234"
    },
    "timestamp": 1611929698000,
    "value": {
        "count": 3.0,
        "sum": 20.0,
        "max": 18.0,
        "min": 0.0
    },
    "unit": "Seconds"
}

As per above data 3 partition should be created, but its creating 1 partition using 1st metric_name(DiskWriteOps) & pushing all metric inside this (because of this I think partition_keys = {"metric_name": json_value[0]['metric_name'] })

Upvotes: 0

Views: 671

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20860

If you are passing that record to firehose, it will be treated as one single json record and hence you see only one partition due to json_value[0]['metric_name'] value

In this case, you need to perform some pre-processing via lambda functions or Kinesis analytics to split that json into 3 separate records and then use dynamic partitioning to use the metric_name to create partitions with right name.

Dynamic partition code will look like this:

 partition_keys = {"metric_name": json_value['metric_name'] }

Upvotes: 1

Related Questions