user8339674
user8339674

Reputation:

How to send messages synchronously in kafka?

One way of achieving it could be by setting the properties parameter
max.in.flight.requests.per.connection = 1.

But I want to know if there is an even direct or alternate way of sending messages synchronously in kafka, something like producer.syncSend(...).

Upvotes: 10

Views: 36367

Answers (6)

Amir Esmaeilzadeh
Amir Esmaeilzadeh

Reputation: 1341

If you are not looking for an enterprise solution see this: https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1

Upvotes: 0

Smalltalkguy
Smalltalkguy

Reputation: 399

When max.in.flight.requests.per.connection = 1, it just means the ordering of messages is guaranteed within a partition it has nothing to do with synchronization.

Python code in-case. For a synchronous send, make sure to block on the future with a good time-out.

from kafka import KafkaProducer
from kafka.errors import KafkaError

#by default ack = 1, if ack = 'all' --> waits for acks from replicas 
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')


key = b'key'
value = b'value'

future = producer.send("my-topic", key=key, value=value)

# block on this future for sync sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.flush()
producer.close()

Upvotes: 2

Vassilis
Vassilis

Reputation: 1054

From my adventures with Kafka :-) order of message production can only be guaranteed if you have one Producer thread and set max.in.flight.requests.per.connection = 1 (or turn of retries, i.e. retries= 0 or both).

If you what to scale to more than one Producer, then you have to "make sure" that messages that will be stored to the same partition will be produced by the same Producer instance.

Upvotes: 0

Camille Vienot
Camille Vienot

Reputation: 807

As Thilo suggested, you can call Future#get to block until the sending has completed. However you might have some performance issue, since the producer starts sending when the producer queue has batch.size elements, when the buffer of size buffer.memory is full or after max.block.ms milliseconds.

If you have a limited number of threads pushing to kafka, you will have to wait max.block.ms each time for your message to be sent. So in some cases, you will prefer using :

// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);

Upvotes: 7

ppatierno
ppatierno

Reputation: 10075

The Thilo proposed answer is the way to go. In general, your suggestion about using max.in.flight.requests.per.connection = 1 is used for having still retries enabled but without losing messages ordering. It's not so used for having a sync producer.

Upvotes: 1

Thilo
Thilo

Reputation: 262814

The producer API returns a Future from send. You can call Future#get to block until the sending has completed.

See this example from the Javadocs:

If you want to simulate a simple blocking call you can call the get() method immediately:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

Upvotes: 16

Related Questions