user2755996
user2755996

Reputation: 115

Error creating a Firehose Datastream through Lambda with Dynamic Partitioning

I'm trying to create Kinesis Firehose streams with Dynamic Partitioning through a Lambda function. The function creates a stream if I remove the DynamicPartitioningConfiguration and ProcessingConfiguration elements, and change the prefix to a Dynamic Partitioning friendly string. The issue is that if those elements are in, I get an error of

kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params

Here is the function creating the streams, the removed ARNs and Glue info all work fine with the Dynamic Partioning stuff removed.

const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();

exports.handler = async (event) => {
    
    let params = {
      DeliveryStreamName: event.body.shop+"-data-stream",
      DeliveryStreamType: "DirectPut",
      ExtendedS3DestinationConfiguration: {
        BucketARN: 'arn:aws:s3:::'+event.body.shop,
        RoleARN: 'aValidARN',
        Prefix: '!{partitionKeyFromQuery:obj-type}/',
        ErrorOutputPrefix: 'errors/',
        DataFormatConversionConfiguration: {
            Enabled: true,
            InputFormatConfiguration: {
              Deserializer: {
                  OpenXJsonSerDe: {
                    CaseInsensitive: false,
                    ConvertDotsInJsonKeysToUnderscores: false
                  }
              }
            },  
            OutputFormatConfiguration: {
                Serializer: {
                  ParquetSerDe: {
                    Compression: "SNAPPY",
                    WriterVersion: "V1"
                  }                    
                }
            },
            SchemaConfiguration: {
                RoleARN: "aValidARN",
                DatabaseName: "aValidDBName",
                TableName: "aValidTableName"
            }
        },
        DynamicPartitioningConfiguration: {
            Enabled: "true"
        },  
        ProcessingConfiguration: {
          Enabled: true,
          Processors: [
            {
              Type: "MetadataExtraction",
              Parameters: [
                {
                  ParameterName: "MetadataExtractionQuery",
                  ParameterValue: 'type:.type'
                },
                {
                  ParameterName: "JsonParsingEngine",
                  ParameterValue: 'JQ-1.6'
                }
              ]
            }
          ]
        },         
      }, 
      Tags: [
        {
          Key: 'billing-'+event.body.shop,
          Value: 'true'
        }
      ]
    };
    
    try{
        await firehose.createDeliveryStream(params).promise();    
    }catch(e){
        console.log("kinesis error "+e);
        return {
            statusCode: 500,
            error: JSON.stringify(e)
        };          
    }

    return {
        statusCode: 200,
        body: JSON.stringify("Stream created"),
    };
};

The docs for this are of very little help. I'm using the JS SDK reference and the element is exactly where it should be, nested within the ExtendedS3DestinationConfiguration element. I'm thinking that there is some conflict of settings that isn't documented and that's why the unexpected key return, but I'm new to firehose and lacking the knowledge to troubleshoot this. I've created the ProcessingConfiguration Element based on the answer at https://stackoverflow.com/a/69047987/2755996 and the cloudformation docs at https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-extendeds3destinationconfiguration.html#cfn-kinesisfirehose-deliverystream-extendeds3destinationconfiguration-dynamicpartitioningconfiguration .

Any help would be much appreciated

Upvotes: 0

Views: 2562

Answers (2)

user2755996
user2755996

Reputation: 115

The SDK can be added as a layer to a lambda function which makes more recent changes available. I used the instructions at https://aws.amazon.com/premiumsupport/knowledge-center/lambda-layer-aws-sdk-latest-version/ to update to v2.997 of the sdk. Individual packages can also be added at > v3 using this method.

I changed the params to

params = {
  DeliveryStreamName: event.body.shop+"-data-stream",
  DeliveryStreamType: "DirectPut",
  ExtendedS3DestinationConfiguration: {
    BucketARN: 'arn:aws:s3:::'+event.body.shop,
    RoleARN: 'aWorkingARN',
    Prefix: '!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/!{partitionKeyFromQuery:object_type}/',
    ErrorOutputPrefix: 'errors/',
    DataFormatConversionConfiguration: {
        Enabled: true,
        InputFormatConfiguration: {
          Deserializer: {
              OpenXJsonSerDe: {
                CaseInsensitive: false,
                ConvertDotsInJsonKeysToUnderscores: false
              }
          }
        },  
        OutputFormatConfiguration: {
            Serializer: {
              ParquetSerDe: {
                Compression: "SNAPPY",
                WriterVersion: "V1"
              }                    
            }
        },
        SchemaConfiguration: {
            RoleARN: "aWorkingARN",
            DatabaseName: "aWorkingDB",
            TableName: "aWorkingTable"
        }
    },
    DynamicPartitioningConfiguration: {
        Enabled: true
    },       
    DataFormatConversionConfiguration: {
        InputFormatConfiguration: {
          Deserializer: {
              OpenXJsonSerDe: {
                CaseInsensitive: false,
                ConvertDotsInJsonKeysToUnderscores: false
              }
          }
        },  
        OutputFormatConfiguration: {
            Serializer: {
              ParquetSerDe: {
                Compression: "SNAPPY",
                WriterVersion: "V1"
              }                    
            }
        },
        SchemaConfiguration: {
            RoleARN: "aWorkingARN",
            DatabaseName: "shopifystructures",
            TableName: "shopifyproductscrawler_test"
        }
    },
    ProcessingConfiguration: {
      Enabled: true,
      Processors: [
        {
          Type: "MetadataExtraction",
          Parameters: [
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{object_type:.object_type}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{hour:.hour}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{day:.day}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{month:.month}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{year:.year}'
            },                
            {
              ParameterName: "JsonParsingEngine",
              ParameterValue: 'JQ-1.6'
            }
          ]
        }
      ]
    }        
  }, 
  Tags: [
    {
      Key: 'billing-'+event.body.shop,
      Value: 'true'
    }
  ]
};

and the function worked perfectly.

Upvotes: 0

Ajinkya
Ajinkya

Reputation: 404

Check the version of AWS JS SDK. Firehose dynamic partitioning feature was launched in v2.979.0. Use JS SDK v2.979.0 or above.

Faced similar issue before with python SDK. Upgrading the version solved this issue

Upvotes: 1

Related Questions