Saiteja Pamulapati
Saiteja Pamulapati

Reputation: 23

Data corruption in Kinesis Producer

I'm using kinesis producer to insert some analytics data in AWS Cloudwatch. This is the entire flow of the data : 1. Kinesis producer (Using java SDK) -> Kinesis Stream -> AWS lambda function -> AWS Cloudwatch

Producer version used in Kinesis :

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>

After pushing the my analytics data to the producer with the following code,

kinesisProducer.addUserRecord(stream, "101", ByteBuffer.wrap(requestString.getBytes()));

Data being sent to the producer:

{
    "version": null,
    "namespace": "namespace",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

I came to know that, the data is converted into base64 format by kinesis producer. Hence data being received in the lambda function after decoding like this -

const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');

payload is coming like this

s   B
101fGaG
{
    "version": null,
    "namespace": "`$\u0017`$>`$0o?=o?=`$!`$\u0010`$*",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

Can anyone help me out with this issue.

Upvotes: 0

Views: 397

Answers (1)

Saiteja Pamulapati
Saiteja Pamulapati

Reputation: 23

Kinesis Producer aggregates small records into larger records upto 1MB. Kinesis consumer has to deaggregate the records before processing.

reference link

de-aggregation library

Upvotes: 1

Related Questions