Reputation: 7187
I have a JMS queue that produces messages. I want to share those messages with multiple Kotlin-consumers, but only if a Kotlin-consumer is connected. If a Kotlin-consumer is only active for 5 minutes, it should only receive messages within that window. The Kotlin-consumer should be able to subscribe at any point, and the received messages obtained at any time.
From reading the docs I think that Kotlin's SharedFlow
is the best way to do that...
"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)
but I can't find any good examples, and the docs are very confusing. The SharedFlow
docs say "all collectors get all emitted values", and "An active collector of a shared flow is called a subscriber" but it doesn't explain how to actually create a subscriber.
Options:
shareIn
says it converts "a cold Flow into a hot SharedFlow", but I don't have a cold flow, I have a hot SharedFlow.Flow.collect
is linked in the docs, but it's marked as internal: "This is an internal kotlinx.coroutines API that should not be used from outside of kotlinx.coroutines."launchIn
is described as terminal - but I don't want to end consumingamqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
Flow.collect
and launchIn
"never complete normally" - but I do want to be able to complete them normally.Here's how I've tried to subscribe to messages, but I can never get any results.
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}
suspend fun produceMessages() = coroutineScope {
val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}
println("waiting 3")
delay(Duration.seconds(3))
launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}
launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}
launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}
launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)
repeat(1000) {
delay(Duration.seconds(1))
println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}
I have a working solution.Thanks go to Tenfour04 for their answer, which helped me understand.
Here's an example close to what I needed.
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}
/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {
val listenerScope = CoroutineScope(SupervisorJob())
val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)
capture.asSharedFlow()
}
/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}
}
fun main() {
val publisher = Publisher()
// both Fish and Llama can subscribe at any point, and get all subsequent values
runBlocking {
delay(Duration.seconds(2))
launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(2))
launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(10))
}
}
Upvotes: 4
Views: 8229
Reputation: 93902
Flow.collect
has an overload that is marked internal, but there is a public collect
extension function that is very commonly used. I recommend putting this catch-all import at the top of your file, and then the extension function will be available among other Flow-related tasks: import kotlinx.coroutines.flow.*
launchIn
and collect
are the two most common ways to subscribe to a flow. They are both terminal. "Terminal" doesn't mean it ends consuming...it means it starts consuming! A "nonterminal" function is one that wraps a Flow in another Flow without starting to collect it.
"Never complete normally" means that the code following it in the coroutine will not be reached. collect
subscribes to a flow and suspends the coroutine until the flow is complete. Since a SharedFlow never completes, it "never completes normally".
It's hard to comment on your code because it's unusual to start your flow and collect it within the same function. Typically a SharedFlow would be exposed as a property for use by other functions. By combining it all into a single function you're hiding the fact that typically a SharedFlow is possibly publishing from a different coroutine scope than its being collected from.
Here's an example partially adapted from your code:
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
// Delay a while. We'll miss the first couple messages.
delay(1300)
// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}
delay(3000) // Keep app alive for a while
}
}
Since collect
typically prevents any code below it from running in the coroutine, the launchIn
function can make it a little more obvious what's happening, and more concise:
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
delay(1300)
publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)
delay(3000)
}
}
Upvotes: 5