Reputation: 59
I have a Kafka producer that sends messages using the sendMessage method below. The method takes a topic name, group name, key, and message payload, then processes and publishes multiple messages from a JSON array.
public void sendMessage(String topicName, String groupName, String key, String msg) throws Exception {
if (msg != null) {
JSONObject jsonReq = new JSONObject(msg);
if (key == null || topicName == null || "".equals(key) || "".equals(topicName)) {
throw new CustomException("Invalid key/topicname.");
}
if (jsonReq.has(groupName)) {
JSONArray messages = jsonReq.getJSONArray(groupName);
if (messages.length() == 0) {
log.info("No messages found in groupName: {}. Skipping processing.", groupName);
return;
}
List<String> messageIds = new ArrayList<>();
AtomicBoolean errorFlag = new AtomicBoolean(false);
long startTime = System.currentTimeMillis();
for (int i = 0; i < messages.length(); i++) {
JSONObject obj = messages.getJSONObject(i);
if (obj.has(key) && StringUtils.isNotBlank(obj.getString(key))) {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, obj.getString(key), obj.toString());
producer.send(producerRecord, new CRKafkaCallBackHandler(producerRecord, errorFlag));
messageIds.add(topicName + "\t" + obj.getString(key));
} else {
throw new CustomException("Mandatory property '" + key + "' is missing in message: " + obj.toString());
}
}
long endTime = System.currentTimeMillis();
log.info("Total Messages: {} For Topic: {} Time Taken: {} ms", messages.length(), topicName, (endTime - startTime));
if (errorFlag.get()) {
throw new CustomException("Failed to publish one or more messages to Kafka");
}
} else {
throw new CustomException("Invalid groupName found in request.");
}
} else {
throw new CustomException("Received a NULL or invalid message.");
}
log.info("Message processing completed successfully For Topic: {}", topicName);
}
Right now, this method runs in a single-threaded fashion, and we are getting Timeoutexception
org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) 120001 ms has passed since batch creation
I suspect it might be a bottleneck under high load. I want to improve throughput by implementing a multi-threaded Kafka producer.
What is the best way to implement a multi-threaded Kafka producer in Java? Should I create a fixed-size thread pool and submit sendMessage tasks, or is there a more Kafka-friendly approach? How can I ensure thread safety? Since Kafka’s send() method is asynchronous, do I need to worry about concurrency issues with the producer instance?
Upvotes: 1
Views: 49