Pastuszka Przemek
Pastuszka Przemek

Reputation: 73

How custom unbounded sources work in Google Cloud DataFlow?

I'm trying to implement custom unbounded source to Google Cloud Dataflow to read from Amazon Kinesis queue. In order to implement checkpointing properly, I'd like to understand how the mechanism works exactly.

How DataFlow works

I was trying to understand checkpoints by reading the documentation for DataFlow, but there are some crucial things missing, so I went through a MillWheel paper. Let me first explain how I understood the concept laid out in this paper. I will focus on interaction between source and its consumer in strong production setup in terms of dataflow API:

PubSub vs. Kinesis

Now, please let me say a few words about how Kinesis queue operates, because it has significant differences with Pub/Sub (for as much as I understand how Pub/Sub works, I haven't been using it myself).

Pub/Sub

I see that Pub/Sub pull model heavily relies on ACKs, i.e. the messages received by client are ACKed and then the "internal checkpoint" in Pub/Sub moves forward -> this means that upcoming pull request will receive consecutive records after the previous ACK.

Kinesis

Kinesis pull interface (there's no push here at all) is more similar to how you interact with a file. You can start reading at any location in the stream (with special values TRIM_HORIZON being the oldest record in the stream and LATEST being latest record in the stream) and then move forward record by record using iterator (iterators are stored on server side and have 5 minutes expiry time if unused). There are no ACKs to the server - it's responsibility of the client to keep track the position in the stream and you can always re-read old records (unless they have expired, of course).

Question / Issues

Cheers, Przemek

Upvotes: 3

Views: 874

Answers (1)

Dan Halperin
Dan Halperin

Reputation: 2247

We are excited to see that you’re using Dataflow with Kinesis. We would love a pull request to our GitHub project with a contrib connector for Kinesis. We would also be happy to review your code via GitHub as you develop and give you feedback there.

how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?

The checkpoint mark should represent “data that has been produced and finalized by this reader”. E.g., if a reader is responsible for a specific shard, the checkpoint mark might consist of the shard identifier and the last sequence number Y within that shard that has been successfully read, indicating “all data up to and including Y has been produced”.

I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.

Finalized checkpoints are persisted, even across JVM failure. In other words, when a JVM dies, the reader will be constructed with the last checkpoint that has been finalized. You should not see readers created with null checkpoints unless they are intended to read from the beginning of a source, or in your scenario when the JVM died before the first successful call to finalizeCheckpoint(). You can use the checkpoint mark at the new reader to construct a new iterator for the same shard that starts from the next record to be read, and you can continue without data loss.

what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).

In Dataflow, each UnboundedSource (that implements getCurrentRecordId and overrides requiresDeduping to return true) is de-duped on its own. Thus, record IDs are only required to be unique for the same source instance. Records from two different sources can use the same record IDs, and they will not be treated as “duplicates” during flattening. So if Amazon Kinesis guarantees that all records have IDs that are globally unique (across all shards within a stream) and persistent (across resharding operations, for example), then these should be suitable for use as the record ID.

Note that getCurrentRecordId is an optional method for UnboundedReader-- you do not need to implement it if your checkpointing scheme uniquely identifies each record. Kinesis lets you read records in sequence number order, and it looks like sequence numbers are globally unique. Thus you might be able to assign each shard to a different worker in generateInitialSplits, and each worker may not ever produce duplicate data -- in this case, you may not need to worry about record IDs at all.

Most of this answer has assumed the simple case where your Kinesis streams do not ever change their shards. On the other hand, if the sharding on the stream changes then your solution will become more complex. E.g., each worker could be responsible for more than 1 shard, so the checkpoint mark would be a map of shard -> sequence number instead of sequence number. And split and merged shards may move around between different Dataflow workers to balance load, and it may be hard to guarantee that no Kinesis record is ever read twice by two different workers. In this case, using Kinesis record IDs with the semantics you described should suffice.

Upvotes: 3

Related Questions