Reputation: 25
I am having a one Kafka producer class that produces data with initializing connection every time, which is time consuming process, so to make it more faster I want to implement Kafka connection pooling. I searched a lot for solution but did not find the right one.Please redirect me to right solution.Thanks. My Kafka Producer class is:
import java.util.Properties;
import org.apache.log4j.Logger;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@SuppressWarnings("deprecation")
public class KafkaProducer1 implements ProducerService {
private static Producer<Integer, String> producer;
private static final String topic= "mytopic1";
private Logger logger = Logger.getLogger(KafkaProducer1.class);
@Override
public void initialize() {
try {
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", "192.168.21.182:9092");
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(producerProps);
producer = new Producer<Integer, String>(producerConfig);
} catch (Exception e) {
logger.error("Exception while sending data to server "+e,e);
}
logger.info("Test Message");
}
@Override
public void publishMessage(String jsonPacket) {
KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String >(topic, jsonPacket);
producer.send(keyedMsg);
// This publishes message on given topic
}
@Override
public void callMessage(String jsonPacket){
initialize();
// Publish message
publishMessage(jsonPacket);
//Close the producer
producer.close();
}
}
Upvotes: 1
Views: 2882
Reputation: 11
If my understanding is correct, you need pool of producer objects which can be always available when a new publish request occurs and wait for other request when the task completed, your requirement may matches 'object pool'(A Object Factory with executor frame work(pool) in java) which is implemented by Apache commons as you need to get KafkaProducer object from the pool. Object pool concept which is implemented and available in apache commons jar. https://dzone.com/articles/creating-object-pool-java
Upvotes: 1
Reputation: 168
You can put all messages in array, iteratively publish it to topic and then close producer when done.This way only one time initialization and one time close or destroy gets called.You can do something like this
String[] jsonPacket/// your message array
for (long i = 0; i < jsonPacket.length; i++) {
producer.send(new KeyedMessage<Integer, String>(topic, jsonPacket[i]));
}
producer.close();
Upvotes: 0