Reputation: 1998
I'm using Spring cloud stream and I'd like to save messages and retry to publish them on the topic when the Kafka server is gone but MessageChannel send() method always return true even if the Kafka/Zookeeper server is stopped.
Can somebody help ?
UPDATE with application.yml content :
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json
CODE :
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);
@Autowired
Source source;
@Autowired
NotificationFileService notificationFileService;
public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);
if (!resp){
log.info(" ======== failed to send the notification" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
}
}).get();
} catch (InterruptedException | ExecutionException e) {
log.info(" ======== failed to send the notification with exception" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
e.printStackTrace();
}
}
}
Upvotes: 5
Views: 2259
Reputation: 174799
Kafka is async by default; you need to set sync
to true
; see the binder producer properties.
sync
Whether the producer is synchronous.
Default: false.
Upvotes: 2