collarblind
collarblind

Reputation: 4739

How does Kafka S3 Connector Once Delivery Guarantee Work

I have read their blog and understand their examples. https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/

But I am trying to wrap my head around this scenario I have. My current configurations are:

"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"

Based on what I've read about the configurations. The connector will commit a file of 50 records or a file after 300000ms(5mins) which ever comes first. If connector uploads a file to s3 but fails to commit to Kafka how will Kafka re-upload the same records that will overwrite the s3 file since I have a rotate schedule interval set? Wouldn't this cause duplication in s3?

Upvotes: 0

Views: 1307

Answers (2)

Vinodhini Chockalingam
Vinodhini Chockalingam

Reputation: 326

Use rotate.interval.ms along with timestamp.extractor set to Record Additionally, ensure that the topic you are reading from has timestamp type set to "LOG_APPEND_TIME"

I am not sure if the worker configuration should have consumer isolation level property set to read committed. Depends on whether the S3 connector automatically does that.

Even with all this, things can go wrong when the timestamp is not monotonically increasing during leader elections in broker cluster. Watch out for the status of this issue

Upvotes: 0

Randall Hauch
Randall Hauch

Reputation: 7187

The S3 sink connector's documentation is another good resource that describes how the connector can guarantee exactly once deliver to S3, and more importantly which combination of features provide (or do not provide) that guarantee.

Specifically, one of the sections in that document says:

To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record) or record fields (timestamp.extractor=RecordField). The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees).

Your S3 sink connector configuration does use a deterministic partitioner (via "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"), but it uses the non-deterministic Wallclock timestamp extractor (via "timestamp.extractor": "Wallclock"). It's non-deterministic because if the connector does have to restart (e.g., because of a failure) and does reprocess a particular record, it will reprocess that record at a later time and the wallclock timestamp extractor will choose a different time for that record.

Secondly, your connector uses the rotate.schedule.interval.ms option, which the documentation notes as not being compatible with exactly once delivery. For example, if the connector does have to reprocess a series of Kafka records it may break the records into different S3 objects than the first time, and that means that the S3 connector ends up writing different S3 objects.

In summary, an S3 sink connector with your configuration will not provide exactly once delivery guarantees.

Upvotes: 9

Related Questions