chroder
chroder

Reputation: 4463

How to asynchronously map over sequence

I want to iterate over a sequence of objects and return the first non-null of an async call.

The point is to perform some kind of async operation that might fail, and I have a series of fallbacks that I want to try in order, one after the other (i.e. lazily / not in parallel).

I've tried to do something similar to what I'd do if it were a sync call:

// ccs: List<CurrencyConverter>
override suspend fun getExchangeRateAsync(from: String, to: String) =
  ccs.asSequence()
    .map { it.getExchangeRateAsync(from, to) }
    .firstOrNull { it != null }
    ?: throw CurrencyConverterException()

IntelliJ complains:

Suspension functions can only be called within coroutine body

Edit: To clarify, this works as expected if mapping on a List, but I want to see how I'd do this on a sequence.

So I guess this is because the map lambda isn't suspended? But I'm not sure how to actually do that. I tried a bunch of different ways but none seemed to work. I couldn't find any examples.

If I re-write this in a more procedural style using a for loop with an async block, I can get it working:

override suspend fun getExchangeRateAsync(from: String, to: String) {
    for (cc in ccs) {
        var res: BigDecimal? = async {
            cc.getExchangeRateAsync(from, to)
        }.await()

        if (res != null) {
            return res
        }
    }

    throw CurrencyConverterException()
}

Upvotes: 13

Views: 12720

Answers (3)

jlinhart
jlinhart

Reputation: 111

I would suggest replacing Sequence with Flow. Flow api and behavior is pretty much same as for Sequence, but with suspending options.

https://kotlinlang.org/docs/reference/coroutines/flow.html

Code:

override suspend fun getExchangeRateAsync(from: String, to: String) =
    ccs.asFlow()
    .map { it.getExchangeRateAsync(from, to) }
    .firstOrNull { it != null }
    ?: throw CurrencyConverterException()

Upvotes: 7

jrhy
jrhy

Reputation: 11

FWIW, I found the suggestion in How to asynchronously map over sequence to be very intuitive. The code at https://github.com/Kotlin/kotlin-coroutines-examples/blob/master/examples/suspendingSequence/suspendingSequence.kt defines SuspendingIterator which allows next() to suspend, then builds SuspendingSequence on top of it. Unfortunately, you need to duplicate extension functions like flatMap(), filter(), etc. since SuspendingSequence can't be related to Sequence, but I did this and am much happier with the result than using a Channel.

Upvotes: 1

Dmytro Rostopira
Dmytro Rostopira

Reputation: 11165

You are getting an error, because Sequence is lazy by default and it's map isn't an inline function, so it's scope isn't defined

You can avoid using Sequence by creating a list of lazy coroutines

// ccs: List<CurrencyConverter>
suspend fun getExchangeRateAsync(from: String, to: String) =
    ccs
    .map { async(start = CoroutineStart.LAZY) { it.getExchangeRateAsync(from, to) } }
    .firstOrNull { it.await() != null }
    ?.getCompleted() ?: throw Exception()

This doesn't give any errors and seems to be working. But I'm not sure it's an idiomatic way

Upvotes: 2

Related Questions