Reputation: 4654
Came across a perplexing issue after upgrading our Kafka version from 2.5.1 to 3.2.0.
I am sending messages to a kafka producer as per the below.
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
"my_topic",
this.toString()
);
Future<RecordMetadata> kafkaResponse = producer.send(producerRecord);
// producer.flush();
String kafkaSuccessStatus = kafkaResponse.isDone()
? "Message sent"
: "Message didn't send";
System.out.println(kafkaSuccessStatus);
The above prints out that the message has been successfully sent. However, the queue has no data inside of it. Further, I get a strange warning.
org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
This seems incongruent with the message being sent. Any ideas on what could be going wrong here?
Just as a side note, this all previously worked in 2.5.1 but we have been forced to upgrade.
Upvotes: 1
Views: 1900
Reputation: 7238
That code is not checking whether a message was sent or not, it's checking if the Java future has finished executing or not.
You should use the version of send()
that accepts a callback to get more feed-back about the asynchronous emission success (see https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-), or you could block with get()
and either expect an instance of RecordMetadata
or an exception to be blowing up your current thread.
Read the javadoc of the send()
method above for more details.
Upvotes: 3