Jason4Ever
Jason4Ever

Reputation: 1479

How to ensure that sent message with Kinesis Producer Library get delivered

I'm using KPL with AWS lambda (Java) for producing to Kinesis stream.

My code for adding messages is some thing like that:

ListenableFuture<UserRecordResult> f = KP.addUserRecord(Stream, partitionKey, ByteBuffer.wrap(data.getBytes()));
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
            private Logger LG = Logger.getLogger(this.getClass());

            @Override
            public void onSuccess(UserRecordResult result) {
                LG.info("Successfully sent "+result.getSequenceNumber()+" to stream shard #"+result.getShardId());
            }

            @Override
            public void onFailure(Throwable t) {
                LG.debug("Something wrong happend while sending to stream , "+t.getMessage());
            }
        });

The problem is sometimes during the execution of Lambda, the producer doesn't commit the message to Kinesis. So if I want to enforce it for pushing the message, I have to call the method flushSync() which causes other errors with Lambda later on.

Also My KPL configuration is:

AggregationEnabled = true
AggregationMaxCount = 4294967295
AggregationMaxSize = 51200
CollectionMaxCount = 500
CollectionMaxSize = 5242880
ConnectTimeout = 6000
FailIfThrottled = false
MaxConnections = 24
MetricsGranularity = shard
MinConnections = 1
RateLimit = 150
RecordMaxBufferedTime = 3000
RecordTtl = 30000
RequestTimeout = 60000
VerifyCertificate = true
CredentialsRefreshDelay = 100

Upvotes: 1

Views: 2233

Answers (1)

Jugster
Jugster

Reputation: 16

This is a known issue with kinesis KPL, which behaves differently than other services in AWS. It is adressed as an issue, but not resolved yet. Also, running the flush or flushSync methods does not ensure that the whole transaction is correctly written as lambda can run into timeout (external shutdown).

Upvotes: 0

Related Questions