Roman T
Roman T

Reputation: 2000

Azure Event Hubs for Kafka with 2 consumers from the same group rebalance infinitely

I use Azure Event Hubs for Kafka and Spring Kafka 1.3.5 (for compatibility reasons) on consumer site. Here is my config for that:

@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("\${eventhubs.broker}") val eventHubsBroker: String,
                           @Value("\${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
                           @Value("\${eventhubs.consumer-group}") val consumerGroup: String) {
    @Bean
    fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
            ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
        val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
        factory.consumerFactory = consumerFactory
        return factory
    }

    @Bean
    fun consumerFactory(consumerConfigs: Map<String, Any>) =
            DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
                    JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"

        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                SaslConfigs.SASL_MECHANISM to "PLAIN",
                SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                        "username=\"\$ConnectionString\" password=\"$connectionString\";",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
        )
    }
}

and the consumer component:

@Component
class NewMailEventConsumer {
    @KafkaListener(topics = ["\${eventhubs.new-mails.topic-name}"])
    fun newMails(newMailEvent: NewMailEvent) {
        logger.info { "new mail event: $newMailEvent" }
    }

    companion object : KLogging()
}

data class NewMailEvent(val mailbox: String, val mailUuid: String)

When I start 2 consumers app with this code I see strange warnings which never ends:

Successfully joined group offer-application-bff-local with generation 5
web_1  | 2018-07-09 11:20:42.950  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:20:42.983  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:28.686  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1  | 2018-07-09 11:21:28.687  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:28.688  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
web_1  | 2018-07-09 11:21:29.670  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.099  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
web_1  | 2018-07-09 11:21:43.131  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.344  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group offer-application-bff-local with generation 7
web_1  | 2018-07-09 11:21:43.345  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.375  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:46.377  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Periodically there is this following exception:

2018-07-09 11:36:21.602  WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors  : Unexpected error code: 60.
web_1  | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
web_1  |
web_1  | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]

and periodically this one

Failed to send SSL Close message 

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]

With single consumer it works like a charm, there are no warnings, nothing. Does anyone have a clue what is going wrong there?

Upvotes: 3

Views: 2437

Answers (2)

Roman T
Roman T

Reputation: 2000

Eventually, I found out what the problem was. As you could see in the code, I didn't specify client.id property in the kafka consumer. That was crucial for the spring-kafka, because it tried to use some auto-generated client.id = consumer-0 for both consumers inside of the consumer group. That resulted in the infinite rebalancing of partitions between the two consumers with the same name. I needed to set it to a partial random string ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}" to get it working:

@Bean
    fun consumerConfigs(): Map<String, Any> {
        val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"

        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
                ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                SaslConfigs.SASL_MECHANISM to "PLAIN",
                SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                        "username=\"\$ConnectionString\" password=\"$connectionString\";",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
        )
    }

Upvotes: 3

kkflf
kkflf

Reputation: 2570

You cannot have more consumers, using identical group-id, than the number of partitions for a given topic.

E.g. A topic with 3 partitions can have 1-3 consumers using the same group-id.

I assume your topic only got one partition and the two consumers keep fighting for this resource. You will either have to remove one of your consumers or add an additional partition to your topic.

Upvotes: 0

Related Questions