World of Titans
World of Titans

Reputation: 59

How to implement a multi-threaded Kafka producer in Java

I have a Kafka producer that sends messages using the sendMessage method below. The method takes a topic name, group name, key, and message payload, then processes and publishes multiple messages from a JSON array.

public void sendMessage(String topicName, String groupName, String key, String msg) throws Exception {
    if (msg != null) {
        JSONObject jsonReq = new JSONObject(msg);
        
        if (key == null || topicName == null || "".equals(key) || "".equals(topicName)) {
            throw new CustomException("Invalid key/topicname.");
        }
        if (jsonReq.has(groupName)) {
            JSONArray messages = jsonReq.getJSONArray(groupName);
            if (messages.length() == 0) {
                log.info("No messages found in groupName: {}. Skipping processing.", groupName);
                return;
            }

            List<String> messageIds = new ArrayList<>();
            AtomicBoolean errorFlag = new AtomicBoolean(false);
            long startTime = System.currentTimeMillis();

            for (int i = 0; i < messages.length(); i++) {
                JSONObject obj = messages.getJSONObject(i);
                if (obj.has(key) && StringUtils.isNotBlank(obj.getString(key))) {
                    ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<>(topicName, obj.getString(key), obj.toString());

                    producer.send(producerRecord, new CRKafkaCallBackHandler(producerRecord, errorFlag));
                    messageIds.add(topicName + "\t" + obj.getString(key));
                } else {
                    throw new CustomException("Mandatory property '" + key + "' is missing in message: " + obj.toString());
                }
            }

            long endTime = System.currentTimeMillis();
            log.info("Total Messages: {} For Topic: {} Time Taken: {} ms", messages.length(), topicName, (endTime - startTime));

            if (errorFlag.get()) {
                throw new CustomException("Failed to publish one or more messages to Kafka");
            }
        } else {
            throw new CustomException("Invalid groupName found in request.");
        }
    } else {
        throw new CustomException("Received a NULL or invalid message.");
    }
    log.info("Message processing completed successfully For Topic: {}", topicName);
}

Right now, this method runs in a single-threaded fashion, and we are getting Timeoutexception

org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) 120001 ms has passed since batch creation

I suspect it might be a bottleneck under high load. I want to improve throughput by implementing a multi-threaded Kafka producer.

What is the best way to implement a multi-threaded Kafka producer in Java? Should I create a fixed-size thread pool and submit sendMessage tasks, or is there a more Kafka-friendly approach? How can I ensure thread safety? Since Kafka’s send() method is asynchronous, do I need to worry about concurrency issues with the producer instance?

Upvotes: 1

Views: 49

Answers (0)

Related Questions