Reputation: 2000
I use Azure Event Hubs for Kafka and Spring Kafka 1.3.5 (for compatibility reasons) on consumer site. Here is my config for that:
@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("\${eventhubs.broker}") val eventHubsBroker: String,
@Value("\${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
@Value("\${eventhubs.consumer-group}") val consumerGroup: String) {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
factory.consumerFactory = consumerFactory
return factory
}
@Bean
fun consumerFactory(consumerConfigs: Map<String, Any>) =
DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"\$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
}
and the consumer component:
@Component
class NewMailEventConsumer {
@KafkaListener(topics = ["\${eventhubs.new-mails.topic-name}"])
fun newMails(newMailEvent: NewMailEvent) {
logger.info { "new mail event: $newMailEvent" }
}
companion object : KLogging()
}
data class NewMailEvent(val mailbox: String, val mailUuid: String)
When I start 2 consumers app with this code I see strange warnings which never ends:
Successfully joined group offer-application-bff-local with generation 5
web_1 | 2018-07-09 11:20:42.950 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:20:42.983 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.686 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.688 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:29.670 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.099 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
web_1 | 2018-07-09 11:21:43.131 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.344 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group offer-application-bff-local with generation 7
web_1 | 2018-07-09 11:21:43.345 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.375 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:46.377 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Periodically there is this following exception:
2018-07-09 11:36:21.602 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors : Unexpected error code: 60.
web_1 | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
web_1 |
web_1 | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
and periodically this one
Failed to send SSL Close message
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]
With single consumer it works like a charm, there are no warnings, nothing. Does anyone have a clue what is going wrong there?
Upvotes: 3
Views: 2437
Reputation: 2000
Eventually, I found out what the problem was.
As you could see in the code, I didn't specify client.id
property in the kafka consumer. That was crucial for the spring-kafka, because it tried to use some auto-generated client.id = consumer-0
for both consumers inside of the consumer group. That resulted in the infinite rebalancing of partitions between the two consumers with the same name. I needed to set it to a partial random string ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}"
to get it working:
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"\$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
Upvotes: 3
Reputation: 2570
You cannot have more consumers, using identical group-id, than the number of partitions for a given topic.
E.g. A topic with 3 partitions can have 1-3 consumers using the same group-id.
I assume your topic only got one partition and the two consumers keep fighting for this resource. You will either have to remove one of your consumers or add an additional partition to your topic.
Upvotes: 0