alex
alex

Reputation: 2193

Kafka consumer returning from poll earlier than the set pollTimeout?

I have created a Kafka consumer (Using KafkaListener interface) using the Spring Kafka starter. I set the pollTimeout in the containerFactory for my consumer. I set it to 5 seconds. When I send a batch of 100 messages at once I notice my consumer will grab some messages from the batch but not all of them. I was anticipating that the consumer would wait for 5 seconds waiting for additional records?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setSyncCommits(true);
    return factory;
}

config values that I'm setting:

kafka_consumer_concurrency: "1"
kafka_consumer_max_poll_interval_ms: "290000"
kafka_consumer_enable_auto_commit: "false"
kafka_consumer_request_timeout_ms: "300000"
kafka_consumer_fetch_max_wait_ms: "1000"
kafka_consumer_fetch_min_bytes: "50000000"
kafka_consumer_fetch_max_bytes: "52428800"
kafka_consumer_partition_fetch_max_bytes: "52428800"
kafka_consumer_poll_timeout_ms: "280000"

Kafka fetch request log statement:

[DEBUG] 20:50:08.028 [-0-C-1] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=consumer_listener] Using older server API v6 to send FETCH 

{replica_id=-1,max_wait_time=1000,min_bytes=50000000,max_bytes=52428800,isolation_level=0,topics=[{topic=test-topic,partitions=[{partition=1,fetch_offset=988786,log_start_offset=-1,partition_max_bytes=52428800},

{partition=0,fetch_offset=1619712,log_start_offset=-1,partition_max_bytes=52428800},

{partition=2,fetch_offset=989663,log_start_offset=-1,partition_max_bytes=52428800}]}]} with correlation id 832 to node 124

Exact same issue is reported here

Upvotes: 0

Views: 2117

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

The poll timeout is simply how long we will wait until at least one record is available.

There is no poll.min.records only fetch.min.bytes and fetch.max.wait.ms.

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.

fetch.max.wait.ms

The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

Upvotes: 1

Related Questions