Edmondo
Edmondo

Reputation: 20090

IRecordsProcessor processRecords not invoked by KCL library

We performed the following test several times, and we struggle to find a reasonable explanation on why this could be happening:

Our has one shard, and sometimes the consumer doesn't get notified of the records. We use different workerIds, but it might be possible that an application with the same ApplicationName steals the records.

What are the reasons why a KCL consumer never gets records that are just published?

Upvotes: 3

Views: 1356

Answers (2)

Edmondo
Edmondo

Reputation: 20090

The problem with my code is that once you have created an application for the first time in the KCL, the matching entry used for checkpointing in Dynamo stores the last position of the shard iterator.

When you will restart the application, the shard iterator might be lagging a lot behind the current time, and it could take lot of iterations before you get to the current position where you publish the record.

So for testing purpose you probably need to create a new app every time and to clean your dynamo tables every night

Upvotes: 1

prisco.napoli
prisco.napoli

Reputation: 616

It seems you are using many workers per application name but your stream has only one shard. Only one worker can be active per shard and retrieve data, all the others will be idle.

You should ensure that the number of workers does not exceed the number of shards (or have just few more for resiliency).

With Kinesis you can have many workers per application name in order to parallelise data retriving from the shards. Workers with the same application name are assumed to be working together on the same stream.

However, every shard can be accessed by one worker per time, e.g. two workers with the same application name can't read from the same shard in the same time. If you have N workers but only 1 shard, there will be always 1 worker running, while the N-1 will be in idle. Which is the active worker, is a matter of scheduling.

If you want many workers that consume data from the same shard in parallel, you need run an additional instance of your application, but with a different application name. The second instance is considered an entirely separate application that is also operating on the same stream.

KCL uses DynamoDB to handle state information per application name (e.g. number of shards, workers, checkpoints, worker-shard mapping ecc.). Each application name should be unique per region and has its own DynamoDB table.

When a worker starts, it queries DynamoDB to see if a table for the application name exists or no. If no table is present, it creates a new one and write its state in it. The workers automatically discover the shards and create processors to handle the data from them.

A worker can read data from many shards, e.g. it uses the DynamoDB table to keep track of which records it has retrieved, and will use it to get new shard iterators. However, as I said before, a shard can be accessed by one worker per time (of course I'm referring to workers with the same application name).

If a second worker starts, it queries DynamoDB and this time finds that a table for the application and stream exists. So it just registers itself by writing the status in the table. Via the DynamoDB table, the workers can discover each other and divide the work, e.g. every worker can read from half of the shards if more then one shard exists. Otherwise, the new worker will be in idle.

Last thing. If I remember correctly, it could take several seconds (up to 10) when a record is sent to kinesis and it is ready to be consumed by a worker.

Hope this help. Check these links for more information:

http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-worker

http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html

Upvotes: 0

Related Questions