lorilew
lorilew

Reputation: 103

Anyone experienced data lost when using AWS kinesis streams, lambda and firehose?

I'm currently sending a series of xml messages to aws kinesis stream, I've been using this on different projects, so I'm pretty confident that this bit works. Then I've written a lambda to process events from kinesis stream to kinesis firehose:

import os
import boto3
import base64

firehose = boto3.client('firehose')


def lambda_handler(event, context):
    deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']

    # Send record directly to firehose
    for record in event['Records']:
        data = record['kinesis']['data']

        response = firehose.put_record(
            DeliveryStreamName=deliveryStreamName,
            Record={'Data': data}
        )
        print(response)

I've set the kinesis stream as the lamdba trigger, and set the batch size as 1, and starting position LATEST.

For the kinesis firehose I have the following config:

Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

I sent 162 events, and I read them from s3, and the most I've managed to get it 160, and usually it's less. I've even tried to wait a few hours incase something strange was happening with retries.

Anyone had any experience using kinesis-> lamdba -> firehose, and seen issues of lost data?

Upvotes: 9

Views: 3404

Answers (1)

k0lpak
k0lpak

Reputation: 553

From what I see here, most likely items are lost when you are publishing data to the Kinesis Stream (not FireHose).

Since you are using put_record when writing to FireHose, it will throw an exception and the lambda will be retried in that case. (Make sense to check if there are failures on that level).

So considering that I may suppose that records are lost before they reach Kinesis stream. If you are sending items to Kinesis stream using put_records method, that doesn't guarantee that all the records will be sent to the stream(due to exceeded write throughput or internal errors), some of the records may fail to be sent. In that case failed subset of records should be resend by your code (Here is Java example, sorry I wasn't able to find the Python one).

Upvotes: 0

Related Questions