Rea B.
Rea B.

Reputation: 79

How to call a reactor function from a coroutine context?

The scenario is I'm developing a Kotlin microservice that uses coroutine and one of the libraries I'm using (reactor.Kafka) is expecting a flux publisher. How can I bridge the two? (I'm using kotlinx-coroutines-reactive and kotlinx-coroutines-reactor to bridge the other way around when consuming an event I open mono{} block and call a suspending function but in this case its the other way around).

The function I'm trying to call:

kafkaSender.send(Flux.just(SenderRecord.create(record, "0")))

Upvotes: 0

Views: 1236

Answers (1)

George Leung
George Leung

Reputation: 1552

At first glance, we do not see bridging methods in kotlinx-coroutines-reactor to await from a Flux. But:

Note that Mono and Flux are subclasses of Reactive Streams' Publisher and extensions for it are covered by the kotlinx-coroutines-reactive module.


I think you can simply call .awaitLast().

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/await-last.html

Upvotes: 1

Related Questions