Reputation: 671
I am still new to Spring Webflux and flatMap on Mono doesn't seem to work.
I have the following function and call to kafkaPublisher.publishToTopic is not working. I inserted the print statement to test if it prints anything and it doesn't even execute the print statement. publishToTopic returns Mono<Void>
.
private Mono<Void> test(Long gId, UUID pId) {
Mono<UUID> nId = pDao.findNId(pId);
Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
return Mono.zip(nId, channels)
.flatMap(t -> {
System.out.println(t.getT1());
return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
});
}
It gets invoked if .block
is called on flatMap
as shown below.
private Mono<Void> test(Long gId, UUID pId) {
Mono<UUID> nId = pDao.findNId(pId);
Mono<List<String>> channels = nId.flatMapMany(pDao::findChannels).collectList();
Mono.zip(nId, channels)
.flatMap(t -> {
System.out.println(t.getT1());
return kafkaPublisher.publishToTopic(gId, t.getT1().toString(), t.getT2());
}).block();
return Mono.empty();
}
Upvotes: 3
Views: 11678
Reputation: 775
flatMap hangs indefinitely, Best way to close the Asynchronous operation by converting it into future object .
public Mono<Void> saveNew(NewPre pre) {
return preDao.insert(preference)
.flatMap({
p -> test(p.p(), p.n())
}).toFuture();
}
Upvotes: 1
Reputation: 671
I found my mistake. I wasn't not using the result of test
anywhere in the function where I was calling this test
method. Here is the code I was using to call test
public Mono<Void> saveNew(NewPre pre) {
preDao.insert(pre)
.flatMap(p -> test(p.pId(), p.nId()));
return Mono.empty();
}
I changed it to following and it works.
public Mono<Void> saveNew(NewPre pre) {
return preDao.insert(preference)
.flatMap(p -> test(p.p(), p.n())
.then(Mono.empty()));
}
Upvotes: 4