Reputation: 265
I'm a bit confused about the relationship between max.in.flight.requests.per.connection
for Kafka Producers and synchronous publishing of events using Spring-Kafka and was hoping someone might be able to clear up the relationship between the two.
I'm looking to set up synchronous event publishing with Spring Kafka using Spring Kafka's KafkaTemplate
. The Spring Kafka documentation provides an example using ListenableFuture
's get(SOME_TIME, TimeUnit)
to enable synchronous publishing of events (duplicated below for reference).
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
I was looking at Kafka's Producer Configuration Documentation and saw that Kafka had a configuration for max.in.flight.requests.per.connection
, which was responsible for the below setting in Kafka.
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
What value does max.in.flight.requests.per.connection
give set to a value of 1 give when event publishing is handled asynchronously? Does setting max.in.flight.requests.per.connection
to a value of 1 force synchronous publishing of events for a Kafka Producer? If I want to set up synchronous publishing of events for a Kafka Producer and take the approach recommended by Spring-Kafka, should I be concerned about max.in.flight.requests.per.connection
or is it safe to ignore this?
Upvotes: 1
Views: 2482
Reputation: 174574
I don't believe they are related at all. The send is still asynchronous; setting it to one means the second will block until the first completes.
future1 = template.send(...);
future2 = template.send(...); // this will block
future1.get(); // and this will return almost immediately
future2.get();
You still need to get the result of the future, to test success/failure.
Upvotes: 2