Reputation: 4463
I want to iterate over a sequence of objects and return the first non-null of an async call.
The point is to perform some kind of async operation that might fail, and I have a series of fallbacks that I want to try in order, one after the other (i.e. lazily / not in parallel).
I've tried to do something similar to what I'd do if it were a sync call:
// ccs: List<CurrencyConverter>
override suspend fun getExchangeRateAsync(from: String, to: String) =
ccs.asSequence()
.map { it.getExchangeRateAsync(from, to) }
.firstOrNull { it != null }
?: throw CurrencyConverterException()
IntelliJ complains:
Suspension functions can only be called within coroutine body
Edit: To clarify, this works as expected if mapping on a List, but I want to see how I'd do this on a sequence.
So I guess this is because the map lambda isn't suspended? But I'm not sure how to actually do that. I tried a bunch of different ways but none seemed to work. I couldn't find any examples.
If I re-write this in a more procedural style using a for
loop with an async block, I can get it working:
override suspend fun getExchangeRateAsync(from: String, to: String) {
for (cc in ccs) {
var res: BigDecimal? = async {
cc.getExchangeRateAsync(from, to)
}.await()
if (res != null) {
return res
}
}
throw CurrencyConverterException()
}
Upvotes: 13
Views: 12720
Reputation: 111
I would suggest replacing Sequence with Flow. Flow api and behavior is pretty much same as for Sequence, but with suspending options.
https://kotlinlang.org/docs/reference/coroutines/flow.html
Code:
override suspend fun getExchangeRateAsync(from: String, to: String) =
ccs.asFlow()
.map { it.getExchangeRateAsync(from, to) }
.firstOrNull { it != null }
?: throw CurrencyConverterException()
Upvotes: 7
Reputation: 11
FWIW, I found the suggestion in How to asynchronously map over sequence to be very intuitive. The code at https://github.com/Kotlin/kotlin-coroutines-examples/blob/master/examples/suspendingSequence/suspendingSequence.kt defines SuspendingIterator
which allows next()
to suspend, then builds SuspendingSequence
on top of it. Unfortunately, you need to duplicate extension functions like flatMap()
, filter()
, etc. since SuspendingSequence
can't be related to Sequence
, but I did this and am much happier with the result than using a Channel.
Upvotes: 1
Reputation: 11165
You are getting an error, because Sequence
is lazy by default and it's map
isn't an inline function, so it's scope isn't defined
You can avoid using Sequence
by creating a list of lazy coroutines
// ccs: List<CurrencyConverter>
suspend fun getExchangeRateAsync(from: String, to: String) =
ccs
.map { async(start = CoroutineStart.LAZY) { it.getExchangeRateAsync(from, to) } }
.firstOrNull { it.await() != null }
?.getCompleted() ?: throw Exception()
This doesn't give any errors and seems to be working. But I'm not sure it's an idiomatic way
Upvotes: 2