Not able connect to EventHub via KAFKA api

am getting below exception while connecting Event Hub via kafka libraries.

 Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative


ERROR [pool-6-thread-1] STREAM - code="SAE-SP-A-1000: Stream processing failed, exiting ...",exception="Invalid SASL mechanism response, server may be expecting a different protocol"
 org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol
 Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative
      at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:298) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:678) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:501) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveToken(SaslClientAuthenticator.java:435) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:259) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar!/:?]

The consumer properties are as given below:

bootstrap.servers=XXX-topics.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://XXXX-topics.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************";

Upvotes: 6

Views: 10469

Answers (4)

kism3t
kism3t

Reputation: 1361

For me it was this adding group id like:

bootstrap.servers=<namespace>.servicebus.windows.net:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<key>";

Upvotes: 0

Sebcom GmbH
Sebcom GmbH

Reputation: 1

Based on the latest Kafka version, this may be the reasons for this exception.

  1. Authentication failed due to invalid credentials with brokers older than 1.0.0
  2. Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic)
  3. Transient network issue. (org.apache.kafka.clients.NetworkClient:759)

In my case the TLS traffic was blocked by the firewall

Upvotes: 0

user1853141
user1853141

Reputation: 94

You need to additionally use ssl.ca.cert= because you are using SASL_SSL and not SASL_PLAINTEXT. We need to use SASL_SSL to connect to azure event hub.

Upvotes: 0

DSchwilk
DSchwilk

Reputation: 181

This error occurs when publishing to a basic plan Event Hub, as the basic plan does not support interaction via Kafka protocol.

https://azure.microsoft.com/de-de/pricing/details/event-hubs/

An upgrade to a standard plan should resolve this.

Upvotes: 18

Related Questions