Reputation: 1479
I'm using KPL with AWS lambda (Java) for producing to Kinesis stream.
My code for adding messages is some thing like that:
ListenableFuture<UserRecordResult> f = KP.addUserRecord(Stream, partitionKey, ByteBuffer.wrap(data.getBytes()));
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
private Logger LG = Logger.getLogger(this.getClass());
@Override
public void onSuccess(UserRecordResult result) {
LG.info("Successfully sent "+result.getSequenceNumber()+" to stream shard #"+result.getShardId());
}
@Override
public void onFailure(Throwable t) {
LG.debug("Something wrong happend while sending to stream , "+t.getMessage());
}
});
The problem is sometimes during the execution of Lambda
, the producer doesn't commit the message to Kinesis. So if I want to enforce it for pushing the message, I have to call the method flushSync()
which causes other errors with Lambda later on.
Also My KPL configuration is:
AggregationEnabled = true
AggregationMaxCount = 4294967295
AggregationMaxSize = 51200
CollectionMaxCount = 500
CollectionMaxSize = 5242880
ConnectTimeout = 6000
FailIfThrottled = false
MaxConnections = 24
MetricsGranularity = shard
MinConnections = 1
RateLimit = 150
RecordMaxBufferedTime = 3000
RecordTtl = 30000
RequestTimeout = 60000
VerifyCertificate = true
CredentialsRefreshDelay = 100
Upvotes: 1
Views: 2233
Reputation: 16
This is a known issue with kinesis KPL, which behaves differently than other services in AWS. It is adressed as an issue, but not resolved yet. Also, running the flush or flushSync methods does not ensure that the whole transaction is correctly written as lambda can run into timeout (external shutdown).
Upvotes: 0