Reputation: 13668
I have a flow which could possible throw an error like so:
val myFlow = flow {
emit("1")
delay(2000)
emit("2")
delay(2000)
emit("3")
delay(2000)
emit("4")
delay(2000)
throw Exception() // here it would throw an error
delay(10000)
emit("6") // because the flow completes on error, it doesn't emit this
}
My problem is that, when the error is thrown even when I add a .catch { error -> emit("5") }
.. it still completes the flow, and so "6"
isnt emitted.
myFlow.catch { error ->
emit("5")
}.onEach {
println("$it")
}.onCompletion {
println("Complete")
}.launchIn(scope)
And the result is:
1
2
3
4
5
Complete
I need it to be:
1
2
3
4
5
6
Complete
I want to swallow the error instead of making the flow complete. How can I achieve this?
Upvotes: 5
Views: 7250
Reputation: 9
Encountered same problem today, with some extension functions magic here is my solution:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() {
runBlocking {
flow {
for (i in 0..10) {
emit(i)
}
}
.allowErrors()
.onEachSuccess {
if (it == 3 || it == 6) {
throw RuntimeException("$it !!!")
}
}
.mapSuccess { "Success :$it; " }
.onEachError {
println("ERROR: ${it.message}")
}
.reduceSuccess { a, b -> a + b }
.also { println(it) }
}
}
fun <T> Flow<T>.allowErrors(): Flow<Pair<T?, Throwable?>> = transform { value ->
return@transform emit(value to null)
}
fun <T> Flow<Pair<T?, Throwable?>>.filterSuccess(predicate: suspend (T) -> Boolean): Flow<Pair<T?, Throwable?>> =
transform { value ->
if (value.second == null) {
try {
if (predicate(value.first!!)) return@transform emit(value)
} catch (e: Exception) {
emit(value.first to e)
}
} else {
return@transform emit(value)
}
}
fun <T> Flow<Pair<T?, Throwable?>>.onEachSuccess(action: suspend (T) -> Unit): Flow<Pair<T?, Throwable?>> =
transform { value ->
if (value.second == null) {
try {
action(value.first!!)
return@transform emit(value)
} catch (e: Exception) {
emit(value.first to e)
}
} else {
return@transform emit(value)
}
}
fun <T, R> Flow<Pair<T?, Throwable?>>.mapSuccess(transform: suspend (T) -> R): Flow<Pair<R?, Throwable?>> =
transform { value ->
if (value.second == null) {
try {
return@transform emit(transform(value.first!!) to null)
} catch (e: Exception) {
emit(null to e)
}
} else {
return@transform emit(null to value.second)
}
}
fun <T> Flow<Pair<T?, Throwable?>>.onEachError(action: suspend (Throwable) -> Unit): Flow<Pair<T?, Throwable?>> =
transform { value ->
if (value.second != null) {
action(value.second!!)
return@transform emit(value)
} else {
return@transform emit(value)
}
}
suspend fun <T> Flow<Pair<T?, Throwable?>>.collectSuccess(collector: FlowCollector<T>) {
[email protected] { it.second == null }.map { it.first!! }.collect(collector)
}
suspend fun <S, T : S> Flow<Pair<T?, Throwable?>>.reduceSuccess(operation: suspend (accumulator: S, value: T) -> S): S {
return [email protected] { it.second == null }.map { it.first!! }.reduce(operation)
}
Upvotes: 0
Reputation: 2896
OK, I know it's not exactly the same example but I suspect my case is somewhat similar.
Let`s say you have some kind of risky flow. The flow may throw an exception. But you want to maintain the connection with the flow even after exception, because it emits some important, e.g., server real-time updates.
And your goal is suppress an exception or convert it to some data and continue listening to the real-time updates no matter what.
var counter = 0
val riskyFlow = flow {
while (true) {
counter++
delay(1000)
if (counter == 3)
throw IllegalStateException("Oops! Error.")
else
emit("Server update")
}
}
If you use catch, risky-flow will complete after you emit something you want on error.
riskyFlow
.catch { cause -> emit("Emit on error") }
.onEach { println(it) }
.launchIn(GlobalScope)
Solution
Use retry
or retryWhen
. This way you suppress exception by emitting some data on exception, then you'll restart the connection to the flow right away, so it'll continue emitting its data.
riskyFlow
.retryWhen { cause, attempt ->
emit("emit on error")
true
}
.onEach { println(it) }
.launchIn(GlobalScope)
The output is:
I/System.out: Server update
I/System.out: Server update
I/System.out: emit on error
I/System.out: Server update
I/System.out: Server update
Upvotes: 6
Reputation: 598
This is not possible in your current example since the last 2 lines in your flow are unreachable.
you should deal with the exception inside of your flow meaning catch the exception in the flow and emit 5 in your example.
Like this
val myFlow = flow {
emit("1")
delay(2000)
emit("2")
delay(2000)
emit("3")
delay(2000)
emit("4")
delay(2000)
try {
throw Exception() // here it would throw an error
} catch (e: Exception) {
emit("5")
}
delay(10000)
emit("6") // because the flow completes on error, it doesn't emit this
}
Upvotes: 0