Arpan Jain
Arpan Jain

Reputation: 133

How to ensure ordering while putting records in kinesis stream asynchronously?

I am writing an application which reads MySQL bin logs and pushes changes into a Kinesis stream. My use case requires perfect ordering of mysql events in the kinesis stream for which I am using the putrecord operation instead of putrecords and also including the 'SequenceNumberForOrdering' key. But one point of failure still remains i.e. the retry logic. Being an async function (using js sdk of aws), how can i ensure order in case of failure during the write operation to kinesis.

Is blocking write (blocking the event loop till the callback is received for the put record) too bad a solution? Or is there a better way?

Upvotes: 3

Views: 3671

Answers (3)

anon
anon

Reputation: 56

Rather than try to enforce ordering when adding records to the stream, order the records when you read them. In your use case, every binlog entry has a unique file sequence, starting position, and ending position. So it is trivial to order them and identify any gaps.

If you do find gaps when reading, the consumers will have to wait until they're filled. However, assuming no catastrophic failures, all records should be close to each other in the stream, so the amount of buffering should be minimal.

By enforcing ordering on the producer side, you are limiting your overall throughput to how fast you can write individual records. If you can keep up with the actual database changes, then that's OK. But if you can't keep up you'll have ever-increasing lag in the pipeline, even though the consumers may be lightly loaded.

Moreover, you can only enforce order within a single shard, so if your producer ever needs to ingest more than 1 MB/second (or > 1,000 records/second) you are out of luck (and in my experience, the only way you'd reach 1,000 records/second is via PutRecords; if you're writing a single record at a time, you'll get around 20-30 requests/second).

Upvotes: 4

Arpan Jain
Arpan Jain

Reputation: 133

I was able to achieve perfect ordering through the use of an internal FIFO queue. I pushed every event in a FIFO queue which is being read by a recursive function that pushed events in the Kinesis stream (one at a time). I am also storing the bin log offset in an external memory (redis in my case) on each successful putRecord operation and if any of the writes to kinesis fail, I can restart the server and start to read again from the last successful offset value.

Any suggestions for this solution or a different solution will be highly appreciated.

Here is code snippet of my recursive function that reads from the fifo queue.

const fetchAndPutEvent = () => {
let currentEvent = eventQueue.shift(); // dequeue from the fifo queue

if (currentEvent) {
    currentEvent = JSON.parse(currentEvent);
    // put in the kinesis stream with sequence number of last putRecord operation to achieve ordering of events
    return kinesis.putRecord(currentEvent, sequenceNumber, (err, result) => {
        if (err) {
            // in case of error while putting in kinesis stream kill the server and replay from the last successful offset
            logger.fatal('Error in putting kinesis record', err);
            return setTimeout(() => {
                process.exit(0);
            }, 10000);
        }
        try {
            //store the binlog offset and kinesis sequence number in an external memory
            sequenceNumber = result.SequenceNumber;
            let offsetObject = {
                binlogName: currentEvent.currentBinlogName,
                binlogPos: currentEvent.currentBinlogPos,
                sequenceNumber: sequenceNumber
            };
            redisClient.hmset(redisKey, offsetObject);
        }
        catch (ex) {
            logger.fatal('Exception in putting kinesis record', ex);
            setTimeout(function() {
                process.exit(0);
            }, 10000);
        }
        return setImmediate(function() {
            return fetchAndPutEvent();
        });
    });
}
else {
    // in case of empty queue just recursively call the function again
    return setImmediate(function() {
        return fetchAndPutEvent();
    });
}
};

Upvotes: 1

Brian Ecker
Brian Ecker

Reputation: 2087

If you want perfect ordering, then you need to make sure that each event is inserted before inserting the next, so yes, you have to wait until one put request finishes before executing the next. The question is whether you actually need perfect ordering across all events or whether you need perfect ordering within some subset? Because you're working with a relational database, it's highly unlikely you have relations between rows within the same table. It's more likely that you have relations between rows between tables, so you can probably take advantage of bulk put requests using a couple tricks.

The problem with a bulk put request is that it is unordered within the request. Because the bin log gives you the complete image of the row after the change, you actually only care about the most recent entry in the bin log for each primary key, so what you could do instead is collect a relatively large batch of events from the bin log, which should be ordered by time, group them by primary key, and then take only the after_values image from the binlog record for the latest record for each primary key group. You could then safely use a bulk put request for each one of these records and be sure that you would not accidentally put a stale record for a given key into the stream before the most up to date record for that key.

This won't be sufficient for all cases, but in many CDC (https://en.wikipedia.org/wiki/Change_data_capture) setups, this will be enough to accurately replicate data into some other system.

Say you have the following records in your bin log (format taken from https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/):

{"table": "Users", "row": {"values": {"id": 1, "Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Foo User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User C", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}

In this example there are three rows identified by the primary key id. The row with id=1 is inserted and then updated twice, the row with id=2 is inserted, and the row with id=3 is inserted. You need to handle each type of event (write, update, delete) separately, and collect only the latest state for each id. So for writes, you'd take the values for the row, for updates you'd take the after_values for the row, and for deletes you'd put the row into a batch of deletes. In this example the only three entries that matter are:

{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User B", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}

This is because they are the latest versions for each id. You can use a bulk put for a batch containing these three writes and not have to worry about them being out of order in most cases unless you have inter-dependencies between entries in a single table or some other very specific requirement.

If you have deletes, you simply put them in a separate bulk delete that you execute after your bulk put records. In this past I've seen really nice throughput improvements by doing this compaction and batching procedure. But again, if you actually need to read every event, not just copy the latest data to various other stores, then this might not work.

Upvotes: 2

Related Questions