chom
chom

Reputation: 671

Mono.flatMap is not getting invoked

I am still new to Spring Webflux and flatMap on Mono doesn't seem to work. I have the following function and call to kafkaPublisher.publishToTopic is not working. I inserted the print statement to test if it prints anything and it doesn't even execute the print statement. publishToTopic returns Mono<Void>.

private Mono<Void> test(Long gId, UUID pId) {
    Mono<UUID> nId = pDao.findNId(pId);
    Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
    return Mono.zip(nId, channels)
           .flatMap(t -> {
               System.out.println(t.getT1());
               return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
           });

}

It gets invoked if .block is called on flatMap as shown below.

private Mono<Void> test(Long gId, UUID pId) {
    Mono<UUID> nId = pDao.findNId(pId);
    Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
    Mono.zip(nId, channels)
           .flatMap(t -> {
               System.out.println(t.getT1());
               return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
           }).block();
    return Mono.empty();

}

Upvotes: 3

Views: 11678

Answers (2)

dinesh kandpal
dinesh kandpal

Reputation: 775

flatMap hangs indefinitely, Best way to close the Asynchronous operation by converting it into future object .

public Mono<Void> saveNew(NewPre pre) {
return preDao.insert(preference)
                    .flatMap({
                       p -> test(p.p(), p.n())
                      
                    }).toFuture();
}

Upvotes: 1

chom
chom

Reputation: 671

I found my mistake. I wasn't not using the result of test anywhere in the function where I was calling this test method. Here is the code I was using to call test

public Mono<Void> saveNew(NewPre pre) {
    preDao.insert(pre)
                        .flatMap(p -> test(p.pId(), p.nId()));
    return Mono.empty();
}

I changed it to following and it works.

public Mono<Void> saveNew(NewPre pre) {
    return preDao.insert(preference)
                        .flatMap(p -> test(p.p(), p.n())
                            .then(Mono.empty()));
}

Upvotes: 4

Related Questions