Reputation: 149
I am trying to optimize the performances of Kafka in a scenario where there is high latency (>500ms) and intermittent packet loss. I am working with JAVA and using 'kafka_2.13', version: '2.5.0' API. I have 24 nodes connected to a single broker, each node tries to send a small message to all the other subscribers. I observe that all nodes are able to communicate when there is no packet loss nor latency but they don't seem to be able to communicate soon after I add latency and packet loss. I will do more tests on Monday but I was wondering if anyone had any suggestions on possible configuration improvements.
Following you can see the code that I see to publish and receive messages and then the different configurations that used for consumers and producers.
Publishers:
boolean sendAsyncMessage (byte[] value, String topic) {
ProducerRecord<Long, byte[]> record = new ProducerRecord<> (topic, System.currentTimeMillis (), value);
long msStart = System.currentTimeMillis ();
producer.send (record, (metadata, exception) -> {
long msDelta = System.currentTimeMillis () - msStart;
logger.info ("Message with topic {} sent at {}, was ack after {}", topic, msStart, msDelta);
if (metadata == null) {
logger.info ("An exception was triggered during send:" + exception.toString ());
}
});
producer.flush ();
return true;
}
Subscribers:
while (keepGoing.get ()) {
try {
// java example do it every time!
subscribe ();
ConsumerRecords<Long, byte[]> consumerRecords = consumer.poll (Duration.ofMillis (2000));
manageMessage (consumerRecords);
//Thread processRecords = new Thread (() -> manageMessage (consumerRecords));
//processRecords.start ();
} catch (Exception e) {
logger.error ("Problem in polling: " + e.toString ());
}
}
Producer:
properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_SERIALIZER);
properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_SERIALIZER);
properties.put (ProducerConfig.ACKS_CONFIG, reliable ? "1" : 0);
// host1:port1,host2:port2
properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// how many bytes to buffer records waiting to be sent to the server
//properties.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put (ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
properties.put (ProducerConfig.CLIENT_ID_CONFIG, clientID);
//15 MINUTES
properties.put (ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 54000000);
// MAX UNCOMPRESSED MESSAGE SIZE
// properties.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
// properties.put (ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
properties.put (ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 300);
properties.put (ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
Consumer
properties.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_DESERIALIZER);
properties.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_DESERIALIZER);
// host1:port1,host2:port2
properties.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// should be the topic
properties.put (ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.put (ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
Upvotes: 0
Views: 1510
Reputation: 149
In the end, the biggest factor that was preventing my distributed system to correctly communicate was the producer acks option. In the beginning, we had set this to all (the strictest option) and it seems that that, paired with the deteriorated network was preventing Kafka to have performances similar to other TCP based protocols. We now use 0 for unreliable messages and 1 for reliable.
Upvotes: 0
Reputation: 26895
Before trying to change all settings, I'd make a few changes in your logic:
Producer
Currently you are calling flush()
after sending each message, effectively doing a synchronous send. This is not recommended as it forces the Kafka client to make a request to the cluster for every single message. This is pretty inefficient. In most cases, it's best to let the client decide when to actually send messages and not use flush()
.
Consumer
In each iteration, you are calling subscribe()
, this is not needed. You should only call subscribe()
when you want to change the subscription. Also creating a new thread in each poll()
loop is not recommended! In addition to being slow, you risk creating hundreds or thousands of threads if the consumer starts fetching large amounts of messages.
Kafka is using a TCP protocol, so packets lost should be automatically retried. By default, Kafka clients are configured to retry most operations and automatically reconnect to brokers if a connection is lost.
When doing your tests, before changing configurations, you should see how the Kafka client is behaving by monitoring its metrics and logs. Are timeouts reached because of the latency? Are messages retried?
Upvotes: 1