freemanpolys
freemanpolys

Reputation: 1998

Spring cloud stream MessageChannel send() always return true

I'm using Spring cloud stream and I'd like to save messages and retry to publish them on the topic when the Kafka server is gone but MessageChannel send() method always return true even if the Kafka/Zookeeper server is stopped.

Can somebody help ?

UPDATE with application.yml content :

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                    mode: raw
                bindings:
                    output:
                        producer:
                            sync: true
            bindings:
                output:
                    destination: topic-notification
                    content-type: application/json

CODE :

@Service
public class SendToKafka {
    private Logger log = LoggerFactory.getLogger(SendToKafka.class);

    @Autowired
    Source source;

    @Autowired
    NotificationFileService notificationFileService;

    public void send(NotificationToResendDTO notification){
        try {
            CompletableFuture.supplyAsync(() -> notification)
                .thenAcceptAsync(notif -> {
                    boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
                    log.info(" ======== kafka server response === " + resp);

                    if (!resp){
                        log.info(" ======== failed to send the notification" + notification);
                        // save failed notification
                        notificationFileService.writeTofile(notification);
                    }
                }).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info(" ======== failed to send the notification with exception" + notification);
            // save failed notification
            notificationFileService.writeTofile(notification);
            e.printStackTrace();
        }
    }
}

Upvotes: 5

Views: 2259

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

Kafka is async by default; you need to set sync to true; see the binder producer properties.

sync

Whether the producer is synchronous.

Default: false.

Upvotes: 2

Related Questions