Reputation: 2186
I'm very new to Amazon Kinesis so maybe this is just a problem in my understanding but in the AWS Lambda FAQ it says:
The Amazon Kinesis and DynamoDB Streams records sent to your AWS Lambda function are strictly serialized, per shard. This means that if you put two records in the same shard, Lambda guarantees that your Lambda function will be successfully invoked with the first record before it is invoked with the second record. If the invocation for one record times out, is throttled, or encounters any other error, Lambda will retry until it succeeds (or the record reaches its 24-hour expiration) before moving on to the next record. The ordering of records across different shards is not guaranteed, and processing of each shard happens in parallel.
My question is, what happens if for some reason some malformed data gets put onto a shard by a producer and when the Lambda function picks it up it errors out and then just keeps retrying constantly? This then means that the processing of that particular shard would be blocked for 24 hours by the error.
Is the best practice to handle application errors like that by wrapping the problem in a custom error and sending this error downstream along with all the successfully processed records and let the consumer handle it? Of course, this still wouldn't help in the case of an unrecoverable error that crashed the program like a null pointer: again we'd be back to the blocking retry loop for the next 24 hours.
Upvotes: 36
Views: 30046
Reputation: 908
In your lambda you can either throw an error and thus returning back the whole batch, or you can not throw an error and instead push it to an SQS queue to handle those messages differently. SQS has a retention period of 14 days. You could also have checkpoints with each record to know if the record was processed in the previous run.
If you have a lot of incoming data and you don't want to introduce any latency you could just ignore the error and just move on while adding those events to an SQQ queue.
Upvotes: 0
Reputation: 12901
This is a common question on processing events in Kinesis and I'll try to give you some points to build your Lambda function to handle such issues with "corrupted" data. Since it is best practice to have separated parts of your system writing to the Kinesis stream and other parts reading from the Kinesis stream, it is common that you will have such problems.
First, why do you have such problematic events?
Using Kinesis to process your events is a good way to break up a complex system that is doing both front-end processing (serving end users), and at the same time/code back-end processing (analyzing events), into two independent parts of your system. The front-end people can focus on their business, while the back-end people don't need to push code changes to the front-end, if they want to add functionality to serve their analytic use cases. Kinesis is a buffer of events that both breaks the need for synchronization as well simplifies the business logic code.
Therefore, we would like events written to the stream to be flexible in their "schema", and if the front-end teams wish to change the event format, add fields, delete fields, change the protocol or the encryption keys, they should be able to do that as often as they want.
Now it is up to the teams that are reading from the stream to be able to process such flexible events in an efficient way, and not break their processing every time such change is happening. Therefore, it should be common that your Lambda function will see events that it can't process, and "poison-pill" is not that rare event as you might expect.
Second, how do you handle such problematic events?
Your Lambda function will get a batch of events to process. Please note that you shouldn't get the events one by one, but in large batches of events. If your batches are too small, you will quickly get large lags on the stream.
For each batch you will iterate over the events, process them and then check-point in DynamoDB the last sequence-id of the batch. Lambda is doing most of these steps automatically with (see more here: http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
This is what is happening in the "happy path", if all the events are processed without any problem. But if you encounter any problem in the batch and you don't "commit" the events with the success notification, the batch will fail and you will get all the events in the batch again.
Now you need to decide what is the reason of the failure in the processing.
Temporary problem (throttling, network issue...) - it is OK to wait a second and try again for a couple of times. In many cases the issue will resolve itself.
Occasional problem (out of memory...) - it is best to increase the memory allocation of the Lambda function or decrease the batch size. In many cases such modification will resolve the issue.
Constant failure - it means that you have to either ignore the problematic event (put it in a DLQ - dead-letter-queue) or modify your code to handle it.
The problem is to identify the type of failure in your code and handle it differently. You need to write your Lambda code in a way to identify it (type of exception, for example) and react differently.
You can use the integration with CloudWatch to write such failures to the console and create the relevant alarms. You can use the CloudWatch Logs also as a way to log your "dead-letter-queue" and see what is the source of problem.
Upvotes: 24
Reputation: 3629
Don't overthink it, the Kinesis is just a queue. You have to consume a record (ie. pop from the queue) successfully in order to proceed to the next one. Just like a FIFO stack.
The appropriate approach should be:
By the way, if processing of a record takes more than 1 minute, it is obvious you are doing something wrong. Because Kinesis is designed to handle thousands of records per second, you should not have the luxury of processing such long jobs for each of them.
The question you are asking is a general problem of queue systems, sometimes called "poisonous message". You have to handle them in your business logic to be safe.
http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages
Upvotes: 42