Reputation: 15
I am attempting to send messages to an Azure Event Hub (Premium) using Apache Camel 4.4.3 with its Kafka component on Java 17, operating under Windows 10. While I have successfully used other Kafka clients (like Kafka UI, Confluent's Java client, and directly through Apache Camel's Azure Eventhubs component) to connect and send messages, the Camel Kafka component consistently fails for me during the authentication process.
Here's the essential part of the code configuring the Kafka endpoint:
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
String kafkaEndpointUri = "kafka:{{eventhub.name}}?brokers={{eventhub.namespace}}.servicebus.windows.net:9093"
+ "&securityProtocol=SASL_SSL"
+ "&saslMechanism=PLAIN"
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{{eventhub.connectionstring}}\";";
from("timer://foo?repeatCount=1")
.setBody(constant("Hello from Camel to Azure EventHub!"))
.to(kafkaEndpointUri);
}
});
The application throws a runtime exception during SASL authentication:
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
[main] DEBUG org.apache.camel.impl.DefaultCamelContext - start() took 536 millis
Camel application is running. Press Ctrl + C to terminate.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=EVENTHUB_NAMESPACE.servicebus.windows.net;mechs=[PLAIN]
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.SslTransportLayer - [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=EVENTHUB_NAMESPACE.servicebus.windows.net/XXX.XXX.XXX.XXX:9093], selector=sun.nio.ch.WEPollSelectorImpl@2643665b, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'EVENTHUB_NAMESPACE.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net, O=Microsoft Corporation, L=Redmond, ST=WA, C=US' protocol 'TLSv1.3' cipherSuite 'TLS_AES_256_GCM_SHA384'
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to INITIAL
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
[Camel (camel-1) thread #1 - timer://foo] DEBUG org.apache.camel.processor.SendProcessor - >>>> kafka://EVENTHUB_NAME?brokers=EVENTHUB_NAMESPACE.servicebus.windows.net%3A9093&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL Exchange[]
[Camel (camel-1) thread #1 - timer://foo] DEBUG org.apache.camel.component.kafka.KafkaProducer - Sending message to topic: EVENTHUB_NAME, partition: null, key: null
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Unexpected error from EVENTHUB_NAMESPACE.servicebus.windows.net/XXX.XXX.XXX.XXX (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null
The working example with Apache Camel with Event Hubs component looks like this:
...
.to(String.format("azure-eventhubs:?connectionString=RAW(%s)", connectionStringWithTopic));
Working example with Confluents Java Client:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectioString));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
Producer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(eventHubName, "key", "This is a message from regular Java app using Confluent Kafka!");
producer.send(record);
Given that there are no issues when using other clients, I suspect there might be a compatibility issue between the Apache Camel Kafka component and Azure Event Hubs or a specific configuration nuance with Camel. I have tried setting a bunch of additional properties such as sslProtocol, clientId etc. but none of my attempts have been successful.
Has anyone experienced similar issues or could provide insights into what might be going wrong here? Could there be specific settings or configurations in Apache Camel that I might be overlooking?
Upvotes: 0
Views: 124
Reputation: 15
Thanks for Ralphi1 for finding the solution to my issue. Here is code that solved my problem:
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
String kafkaEndpointUri = "kafka:{{eventhub.name}}?brokers={{eventhub.namespace}}.servicebus.windows.net:9093"
+ "&securityProtocol=SASL_SSL"
+ "&saslMechanism=PLAIN"
+ "&saslJaasConfig=RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{{eventhub.connectionstring}}\";)";
from("timer://foo?repeatCount=1")
.setBody(constant("Hello from Camel to Azure EventHub!"))
.to(kafkaEndpointUri);
}
});
The key here was using the RAW()
function to make sure that special characters in the connectionstring doesn't get misinterpreted by the Camel Parser.
Upvotes: 0