Reputation: 24844
I'm creating a custom SinkConnector using Kafka Connect (2.3.0) that needs to be optimized for throughput rather than latency. Ideally, what I want is:
Batches of ~ 20 megabytes or 100k records whatever comes first, but if message rate is low, process at least every minute (avoid small batches, but minimum MySinkTask.put() rate to be every minute).
This is what I set for consumer settings in an attempt to accomplish it:
consumer.fetch.min.bytes=1048576
I needs this fetch.min.bytes setting, or else MySinkTask.put() is called for multiple times per second despite the other settings...?
Now, what I observe in a low-rate situation is that MySinkTask.put() is called with 0 records multiple times and several minutes pass by, until fetch.min.bytes is reached, and then I get them all at once.
I fail to understand so far:
I've double-checked the log output, and the lines INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
as printed by the Connect Runtime are showing the expected values as I pass with the consumer.
prefixed values.
Upvotes: 0
Views: 2461
Reputation: 24844
The "process at least every interval" part seems not possible, as the fetch.min.bytes
consumer setting takes precedence and Connect does not allow you to dynamically adjust the ConsumerConfig while the Task is running. :-(
Work-around for now is batching in the Task manually; set fetch.min.bytes
to 1 (yikes), buffer records in the Task on put() calls, and flush when necessary. This is not very ideal as it infers some overhead for the Connector which I hoped to avoid.
The logic how Connect does a ~ 2x per second batching from its consumer's poll to SinkTask.put() remains a mystery to me, but it's better than being called for every message.
Upvotes: 1