Reputation:
One way of achieving it could be by setting the properties parameter
max.in.flight.requests.per.connection = 1
.
But I want to know if there is an even direct or alternate way of sending messages synchronously in kafka, something like producer.syncSend(...)
.
Upvotes: 10
Views: 36367
Reputation: 1341
If you are not looking for an enterprise solution see this: https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1
Upvotes: 0
Reputation: 399
When max.in.flight.requests.per.connection = 1, it just means the ordering of messages is guaranteed within a partition it has nothing to do with synchronization.
Python code in-case. For a synchronous send, make sure to block on the future with a good time-out.
from kafka import KafkaProducer
from kafka.errors import KafkaError
#by default ack = 1, if ack = 'all' --> waits for acks from replicas
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')
key = b'key'
value = b'value'
future = producer.send("my-topic", key=key, value=value)
# block on this future for sync sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
log.exception()
pass
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
producer.close()
Upvotes: 2
Reputation: 1054
From my adventures with Kafka :-) order of message production can only be guaranteed if you have one Producer thread and set max.in.flight.requests.per.connection
= 1 (or turn of retries
, i.e. retries
= 0 or both).
If you what to scale to more than one Producer, then you have to "make sure" that messages that will be stored to the same partition will be produced by the same Producer instance.
Upvotes: 0
Reputation: 807
As Thilo suggested, you can call Future#get
to block until the sending has completed. However you might have some performance issue, since the producer starts sending when the producer queue has batch.size
elements, when the buffer of size buffer.memory
is full or after max.block.ms
milliseconds.
If you have a limited number of threads pushing to kafka, you will have to wait max.block.ms
each time for your message to be sent. So in some cases, you will prefer using :
// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
Upvotes: 7
Reputation: 10075
The Thilo proposed answer is the way to go. In general, your suggestion about using max.in.flight.requests.per.connection = 1 is used for having still retries enabled but without losing messages ordering. It's not so used for having a sync producer.
Upvotes: 1
Reputation: 262814
The producer API returns a Future
from send
. You can call Future#get
to block until the sending has completed.
See this example from the Javadocs:
If you want to simulate a simple blocking call you can call the get() method immediately:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
Upvotes: 16