andrii
andrii

Reputation: 1388

Kafka producer is losing messages when broker is down

Given the following scenario:

I bring up zookeeper and a single kafka broker on my local and create "test" topic as described in the kafka quickstart: https://kafka.apache.org/quickstart

Then, I run a simple java program that produces a message to the "test" topic every second. After some time I bring down my local kafka broker and see producer continues producing messages, it doesn't throw any exception. Finally, I bring kafka broker up again, producer is able to reconnect to broker and it continues producing messages, but, all those messages that were produced during kafka broker downtime are lost. Producer doesn't replay them when detects healthy kafka broker.

How can I prevent this? I want kafka producer to replay those messages when it detects kafka broker back online. Here is my producer config:

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

Upvotes: 1

Views: 5882

Answers (2)

Daniel
Daniel

Reputation: 783

Kafka Producer library has a retry mechanism built in, however it is turned off by default. Change a retries Producer config to a value bigger that 0 (default value) to turn it on. You should also experiment with retry.backoff.ms and request.timetout.ms in order to customise Producer retries.

Example Kafka Producer config with enabled retries:

retries=2147483647         //Integer.MAX_VALUE 
retry.backoff.ms=1000
request.timeout.ms=305000  //5 minutes
max.block.ms=2147483647    //Integer.MAX_VALUE 

You can find more information about those properties in Apache Kafka documentation.

Upvotes: 2

Lalit
Lalit

Reputation: 2014

Since you're running just one broker, I'm afraid you won't be able to store messages when your broker is down.

However, it is strange that you don't get any exception/warning/errors when you bring your broker down.

I would expect a "Failed to update metadata" or "expiring messages" error because when the producer sends messages to the broker(s) mentioned against the bootstrap.servers property, it first checks with the zookeeper for the active controller (or leader) and partitions. So, in your case since you're running kafka in a stand-alone mode and when the broker is down the producer should not receive the leader information and error out.

Could you please check what the following properties are set to:

request.timeout.ms
max.block.ms

and play around (reducing, may be) with these values? and check the results?

One more option you might want to try out is to send messages to Kafka in a synchronous fashion (blocking send() method until the messages are received) and here's a code snippet that might help (taken from this documentation reference):

If you want to simulate a simple blocking call you can call the get() method immediately:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

In this case, kafka should throw an exception if the messages are not sent successfully for any reason.

I hope this helps.

Upvotes: 0

Related Questions