Reputation: 115
I'm trying to create Kinesis Firehose streams with Dynamic Partitioning through a Lambda function. The function creates a stream if I remove the DynamicPartitioningConfiguration
and ProcessingConfiguration
elements, and change the prefix to a Dynamic Partitioning friendly string. The issue is that if those elements are in, I get an error of
kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params
Here is the function creating the streams, the removed ARNs and Glue info all work fine with the Dynamic Partioning stuff removed.
const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();
exports.handler = async (event) => {
let params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aValidARN',
Prefix: '!{partitionKeyFromQuery:obj-type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aValidARN",
DatabaseName: "aValidDBName",
TableName: "aValidTableName"
}
},
DynamicPartitioningConfiguration: {
Enabled: "true"
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: 'type:.type'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
},
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
try{
await firehose.createDeliveryStream(params).promise();
}catch(e){
console.log("kinesis error "+e);
return {
statusCode: 500,
error: JSON.stringify(e)
};
}
return {
statusCode: 200,
body: JSON.stringify("Stream created"),
};
};
The docs for this are of very little help. I'm using the JS SDK reference and the element is exactly where it should be, nested within the ExtendedS3DestinationConfiguration element. I'm thinking that there is some conflict of settings that isn't documented and that's why the unexpected key return, but I'm new to firehose and lacking the knowledge to troubleshoot this. I've created the ProcessingConfiguration
Element based on the answer at https://stackoverflow.com/a/69047987/2755996 and the cloudformation docs at https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-extendeds3destinationconfiguration.html#cfn-kinesisfirehose-deliverystream-extendeds3destinationconfiguration-dynamicpartitioningconfiguration .
Any help would be much appreciated
Upvotes: 0
Views: 2562
Reputation: 115
The SDK can be added as a layer to a lambda function which makes more recent changes available. I used the instructions at https://aws.amazon.com/premiumsupport/knowledge-center/lambda-layer-aws-sdk-latest-version/ to update to v2.997 of the sdk. Individual packages can also be added at > v3 using this method.
I changed the params to
params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aWorkingARN',
Prefix: '!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/!{partitionKeyFromQuery:object_type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "aWorkingDB",
TableName: "aWorkingTable"
}
},
DynamicPartitioningConfiguration: {
Enabled: true
},
DataFormatConversionConfiguration: {
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "shopifystructures",
TableName: "shopifyproductscrawler_test"
}
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{object_type:.object_type}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{hour:.hour}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{day:.day}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{month:.month}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{year:.year}'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
}
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
and the function worked perfectly.
Upvotes: 0