Reputation: 3104
I'm new to reactive programming. I expect to see
test provider started
Beat 1000
Beat 2000
in logs but there is only test provider started
and no Beat
or on complete
messages. Looks like I miss something
@Service
class ProviderService {
@PostConstruct
fun start(){
val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
val provider = Provider("test", hb)
}
}
////////////////////////
open class Provider(name: String, heartBests: Flux<HeartBeat>) {
companion object {
val log = LoggerFactory.getLogger(Provider::class.java)!!
}
init {
log.info("$name provider started")
heartBests.doOnComplete { log.info("on complete") }
heartBests.doOnEach { onBeat(it.get().number) }
}
fun onBeat(n: Number){
log.info("Beat $n")
}
}
/////
class HeartBeat(val number: Number)
Upvotes: 1
Views: 3969
Reputation: 28301
three pretty common mistakes here:
doOnEach
return a new Flux
instance with the added behavior, so you need to (re)assign to a variable or use a fluent stylesubscribe()
(or a variant of it. blockXXX
do also subscribe
under the hood for instance...)Thread
due to the time dimension of the source, interval
. As a result, control would immediately return in init
even if you had subscribed, potentially causing the main thread and then the app to exit.Upvotes: 3
Reputation: 1550
In your code lambda from 'doOnComplete' has been never called, because you created infinite stream. Method 'doOnEach' as 'map' is intermediate operations (like map
in streams), its doesn't make a call.
And you have another mistake, reactive suggests "fluent pattern".
Try this simple example:
import reactor.core.publisher.Flux
import java.time.Duration
fun main(args: Array<String>) {
val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
println("start")
flux.take(3)
.doOnEach { println("on each $it") }
.map { println("before map");HeartBeat(it.value * 2) }
.doOnNext { println("on next $it") }
.doOnComplete { println("on complete") }
.subscribe { println("subscribe $it") }
Thread.sleep(5000)
}
data class HeartBeat(val value: Long)
Upvotes: 2