xander27
xander27

Reputation: 3104

Dont understand how to make flux subscription working in kotlin

I'm new to reactive programming. I expect to see

test provider started
Beat 1000
Beat 2000

in logs but there is only test provider started and no Beat or on complete messages. Looks like I miss something

@Service
class ProviderService {

    @PostConstruct
    fun start(){
        val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
        val provider = Provider("test", hb)
    }

}
////////////////////////

open class Provider(name: String, heartBests: Flux<HeartBeat>) {
    companion object {
        val log = LoggerFactory.getLogger(Provider::class.java)!!
    }

    init {
        log.info("$name provider started")
        heartBests.doOnComplete { log.info("on complete") }
        heartBests.doOnEach { onBeat(it.get().number) }
    }

    fun onBeat(n: Number){
        log.info("Beat $n")
    }
}

/////
class HeartBeat(val number: Number)

Upvotes: 1

Views: 3969

Answers (2)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

three pretty common mistakes here:

  • operators like doOnEach return a new Flux instance with the added behavior, so you need to (re)assign to a variable or use a fluent style
  • nothing happens until you subscribe() (or a variant of it. blockXXX do also subscribe under the hood for instance...)
  • such a pipeline is fully asynchronous, and runs on a separate Thread due to the time dimension of the source, interval. As a result, control would immediately return in init even if you had subscribed, potentially causing the main thread and then the app to exit.

Upvotes: 3

kurt
kurt

Reputation: 1550

In your code lambda from 'doOnComplete' has been never called, because you created infinite stream. Method 'doOnEach' as 'map' is intermediate operations (like map in streams), its doesn't make a call. And you have another mistake, reactive suggests "fluent pattern".

Try this simple example:

import reactor.core.publisher.Flux
import java.time.Duration

fun main(args: Array<String>) {
    val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }

    println("start")

    flux.take(3)
            .doOnEach { println("on each $it") }
            .map { println("before map");HeartBeat(it.value * 2) }
            .doOnNext { println("on next $it") }
            .doOnComplete { println("on complete") }
            .subscribe { println("subscribe $it") }

    Thread.sleep(5000)
}

data class HeartBeat(val value: Long)

Upvotes: 2

Related Questions