Maayan Hope
Maayan Hope

Reputation: 1611

SchemaException - Error reading field 'leader_id': String length -1 cannot be negative - Azure event hubs - Kafka

I am getting this exception

org.apache.kafka.common.protocol.types.SchemaException

while the kafka is re-balancing

These are the details:

  1. Using Azure event hubs. Accessing it using the kafka API

  2. "Kafka Enabled" = yes , in azure portal

  3. using: compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.2'

  4. Using a consumer group

  5. Properties properties = new Properties();
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s.servicebus.windows.net:9093", this.namespace));
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MeasurementDeSerializer.class.getName());
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupName);
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    

I have 2 clients on 2 different PC's

When they both run , each gets 16 partitions out of the available 32 partitions.

When i shutdown one of them , all the parts are re-balanced to the other one.

on the instance that is still running i will get:

  1. Partitions Revoked [16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]

  2. Then , from the pool loop i will get this exception:

    org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'leader_id': String length -1 cannot be negative at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:279) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:586) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:686) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:469) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)

On the other hand, when going the other way, there is no problem

  1. Start the first instance

  2. Instance 1 get all 32 partitions

  3. Start instance 2

  4. Re-balancing starts

  5. Instance 1 is loosing 16 parts

  6. instance 2 gets 16 parts

Any idea what could cause this exception?

Upvotes: 0

Views: 732

Answers (1)

Arthur Erlendsson
Arthur Erlendsson

Reputation: 176

For future readers - issue was fixed. https://github.com/Azure/azure-event-hubs-for-kafka/issues/41

Upvotes: 0

Related Questions