Reputation: 3686
I'm struggling to create a 'takeUntilSignal' operator for a Flow - an extension method that will cancel a flow when another flow generates an output.
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>
My initial effort was to try to launch collection of the signal flow in the same coroutine scope as the primary flow collection, and cancel the coroutine scope:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
kotlinx.coroutines.withContext(coroutineContext) {
launch {
signal.take(1).collect()
println("signalled")
cancel()
}
collect {
emit(it)
}
}
}
But this isn't working (and uses the forbidden "withContext" method that is expressly stubbed out by Flow to prevent usage).
edit I've kludged together the following abomination, which doesn't quite fit the definition (resulting flow will only cancel after first emission from primary flow), and I get the feeling there's a far better way out there:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
combine(
signal.map { it as Any? }.onStart { emit(null) }
) { x, y -> x to y }
.takeWhile { it.second == null }
.map { it.first }
edit2 another try, using channelFlow:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
channelFlow {
launch {
signal.take(1).collect()
println("hello!")
close()
}
collect { send(it) }
close()
}
Upvotes: 14
Views: 18848
Reputation: 2719
Check it https://github.com/hoc081098/FlowExt
package com.hoc081098.flowext
import com.hoc081098.flowext.internal.ClosedException
import com.hoc081098.flowext.internal.checkOwnership
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
/**
* Emits the values emitted by the source [Flow] until a [notifier] [Flow] emits a value or completes.
*
* @param notifier The [Flow] whose first emitted value or complete event
* will cause the output [Flow] of [takeUntil] to stop emitting values from the source [Flow].
*/
public fun <T, R> Flow<T>.takeUntil(notifier: Flow<R>): Flow<T> = flow {
try {
coroutineScope {
val job = launch(start = CoroutineStart.UNDISPATCHED) {
notifier.take(1).collect()
throw ClosedException(this@flow)
}
collect { emit(it) }
job.cancel()
}
} catch (e: ClosedException) {
e.checkOwnership(this@flow)
}
}
Upvotes: 3
Reputation: 6148
Use couroutineScope
and start the new coroutine inside:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.take(1).collect()
println("signalled")
[email protected]()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
//ignore
}
}
Upvotes: 15