Another Dimension
Another Dimension

Reputation: 25

Launching fire and forget job via kotlin coroutines in Webflux application

I am rewriting very basic webflux application into kotlin coroutines for comparison purposes, meaning instead of reactor API, I am using suspend functions with Flows.

I am using webflux with postgres r2dbc running on the netty.

The code that is meant to be rewritten:

    override fun run(vararg args: String) {
        val sql = StreamUtils.copyToString(schema.inputStream, Charsets.UTF_8)

        val insertProducts = Flux.range(0, 1000)
            .map { Product(null, "product-$it", ThreadLocalRandom.current().nextDouble(10.0, 1000.0)) }
            .collectList()
            .flatMap { productRepository.saveAll(it).then() }


        template.databaseClient.sql(sql)
            .then()
            .then(insertProducts)
            .doFinally { logger.info { "Db data setup is finished." } }
            .subscribe()
    }

What I managed to implement:

    override fun run(vararg args: String) {
        val sqlInit = StreamUtils.copyToString(schema.inputStream, Charsets.UTF_8)

        runBlocking(Dispatchers.IO) {
            val products = mutableListOf<Product>()

            for (i in 1..1000) {
                Product(null, "product-$i", ThreadLocalRandom.current().nextDouble(10.0, 1000.0))
                    .also { products.add(it) }
            }
            val sqlExecutionFlow =
                template.databaseClient.sql(sqlInit)
                    .then()
                    .asFlow()

            val productsFlow = productRepository.saveAll(products) //CoroutineCrudRepository

            sqlExecutionFlow.onCompletion { logger.info { "Database initialized." } }.collect()
            productsFlow.onCompletion { logger.info { "Database filled with products." } }.collect()
        }
    }

It works, however I'd like to know if this is a good way how to do it.

It's a fire-and-forget job that should be triggered on the application startup.

Should I use runBlocking with IO dispatcher or without? Should I rather use GlobalScope for it?

I read it is in general discouraged to use, but this seems like a valid case.

Thanks for any input.

Upvotes: 1

Views: 211

Answers (1)

Marko Topolnik
Marko Topolnik

Reputation: 200168

Should I use runBlocking with IO dispatcher or without?

Your code doesn't actually exercise the dispatcher at all, so you can safely remove that with no change in behavior. The dispatcher comes into action only when you launch more coroutines from runBlocking. The coroutine that starts automatically to execute the block of code you pass to runBlocking runs in the ad-hoc dispatcher it creates on the calling thread, which it occupies until completion.

Should I rather use GlobalScope for it?

By this I think you mean "use GlobalScope.launch", and for your exercise I think that would be a step in the right direction.

If I compare your current solution to the original, I notice that the original is asynchronous in nature and override fun run() completes immediately, before the work is done. On the other hand, your usage of runBlocking results in synchronous operation.

If you replace runBlocking with GlobalScope.launch, you'll get the same asynchronous behavior.

One more point, which could be crucial in your phase of learning: coroutines do not need the declarative programming style to achieve their non-blocking behavior, and that is their main point. This gives you the freedom to write regular, procedural-looking code with if and for, and it will still be non-blocking — as long as you use suspendable API for IO.

This point relates to the initial one as well: as long as you use suspendable APIs, you never need Dispatchers.IO, which is there only as a workaround for the situations where you use a blocking API.

Upvotes: 1

Related Questions