kioli
kioli

Reputation: 735

Combining kotlin flow results

I'm wandering if there is a clean way to launch a series of flows in Kotlin and then, after their resolution, perform further operations based on whether they succeeded or not

For example's sake I need to read all integers from a DB (returning them into a flow), check if they are even or odd against an external API (also returning a flow), and then remove the odd ones from the DB

In code it would be something like this

fun findEven() {
   db.readIntegers()
      .map { listOfInt ->
          listOfInt.asFlow()
             .flatMapMerge { singleInt ->
                httpClient.apiCallToCheckForOddity(singleInt)
                   .catch {
                      // API failure when number is even
                   }
                   .map {
                      // API success when number is odd
                      db.remove(singleInt).collect()
                   }
             }.collect()
      }.collect()
}

But the problem I see with this code is the access to the DB deleting entries done in parallel, and I think a better solution would be to run all API calls and somewhere collect all that failed and all that succeeded, so to be able to do a bulk insertion in the DB only once instead of having multiple coroutines do that on their own

Upvotes: 1

Views: 1239

Answers (1)

Tenfour04
Tenfour04

Reputation: 93834

In my opinion, it's kind of an anti-pattern to produce side effects in map, filter, etc. A side effect like removing items from a database should be a separate step (collect in the case of a Flow, and forEach in the case of a List) for clarity.

The nested flow is also kind of convoluted, since you can directly modify the list as a List.

I think you can do it like this, assuming the API can only check one item at a time.

suspend fun findEven() {
    db.readIntegers()
            .map { listOfInt ->
                listOfInt.filter { singleInt ->
                    runCatching {
                        httpClient.apiCallToCheckForOddity(singleInt)
                    }.isSuccess
                }
            }
            .collect { listOfOddInt ->
                db.removeAll(listOfOddInt)
            }
}

Parallel version, if the API call returns the parameter. (By the way, Kotlin APIs should not throw exceptions on non-programmer errors).

suspend fun findEven() {
    db.readIntegers()
            .map { listOfInt ->
                coroutineScope {
                    listOfInt.map { singleInt ->
                        async {
                            runCatching {
                                httpClient.apiCallToCheckForOddity(singleInt)
                            }
                        }
                    }.awaitAll()
                            .mapNotNull(Result<Int>::getOrNull)
                }
            }
            .collect { listOfOddInt ->
                db.removeAll(listOfOddInt)
            }
}

Upvotes: 3

Related Questions