aSemy
aSemy

Reputation: 7187

Kotlin SharedFlow - how to subscribe?

I have a JMS queue that produces messages. I want to share those messages with multiple Kotlin-consumers, but only if a Kotlin-consumer is connected. If a Kotlin-consumer is only active for 5 minutes, it should only receive messages within that window. The Kotlin-consumer should be able to subscribe at any point, and the received messages obtained at any time.

From reading the docs I think that Kotlin's SharedFlow is the best way to do that...

"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)

but I can't find any good examples, and the docs are very confusing. The SharedFlow docs say "all collectors get all emitted values", and "An active collector of a shared flow is called a subscriber" but it doesn't explain how to actually create a subscriber.

Options:

amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
  override suspend fun emit(value: Message) { ... }
})

Here's how I've tried to subscribe to messages, but I can never get any results.


import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {
  produceMessages()
  delay(1000)
}

suspend fun produceMessages() = coroutineScope {

  val messages = MutableSharedFlow<Int>(
    replay = 0,
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverflow.SUSPEND
  )

  // emit messages
  launch {
    repeat(100000) {
      println("emitting $it - result:${messages.tryEmit(it)}")
      delay(Duration.seconds(0.5))
    }
  }

  println("waiting 3")
  delay(Duration.seconds(3))

  launch {
    messages.onEach { println("onEach") }
  }
  launch {
    messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
  }
  launch {
    messages.collect { println("collect") }
  }

  launch {
    messages.launchIn(this)
    messages.collect { println("launchIn + collect") }
  }

  launch {
    val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
    delay(Duration.seconds(2))
    println("new.replayCache: ${new.replayCache}")
  }

  launch {
    println("sharing")
    val l = mutableListOf<Int>()
    val x = messages.onEach { println("hello") }.launchIn(this)

    repeat(1000) {
      delay(Duration.seconds(1))

      println("result $it: ${messages.replayCache}")
      println("result $it: ${messages.subscriptionCount.value}")
      println("result $it: ${l}")
    }
  }
}

Update

I have a working solution.Thanks go to Tenfour04 for their answer, which helped me understand.

Here's an example close to what I needed.

import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class Publisher {
  private val publishingScope = CoroutineScope(SupervisorJob())

  private val messagesFlow = MutableSharedFlow<Int>(
    replay = 0,
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverflow.SUSPEND
  )

  init {
    // emit messages
    publishingScope.launch {
      repeat(100000) {
        println("emitting $it")
        messagesFlow.emit(it)
        delay(Duration.seconds(0.5))
      }
    }
  }

  /** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
  fun listen(name: String): SharedFlow<Int> = runBlocking {

    val listenerScope = CoroutineScope(SupervisorJob())

    val capture = MutableSharedFlow<Int>(
      replay = Int.MAX_VALUE,
      extraBufferCapacity = 0,
      onBufferOverflow = BufferOverflow.SUSPEND
    )

    messagesFlow
      .onEach {
        println("$name is getting message $it")
        capture.emit(it)
      }
      .launchIn(listenerScope)

    capture.asSharedFlow()
  }

  /** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
  suspend fun collectState(name: String): StateFlow<List<Int>> {
    return messagesFlow
      .runningFold(emptyList<Int>()) { acc, value ->
        println("$name is getting message $value")
        acc + value
      }
      .stateIn(publishingScope)
  }

}

fun main() {
  val publisher = Publisher()

// both Fish and Llama can subscribe at any point, and get all subsequent values

  runBlocking {
    delay(Duration.seconds(2))

    launch {
      val listenerFish = publisher.collectState("Fish")
      repeat(4) {
        println("$it. Fish replayCache ${listenerFish.value}")
        delay(Duration.seconds(2))
      }
    }

    delay(Duration.seconds(2))

    launch {
      val listenerLlama = publisher.listen("Llama")
      repeat(4) {
        println("$it. Llama replayCache" + listenerLlama.replayCache)
        delay(Duration.seconds(2))
      }
    }

    delay(Duration.seconds(10))
  }
}

Upvotes: 4

Views: 8229

Answers (1)

Tenfour04
Tenfour04

Reputation: 93902

Flow.collect has an overload that is marked internal, but there is a public collect extension function that is very commonly used. I recommend putting this catch-all import at the top of your file, and then the extension function will be available among other Flow-related tasks: import kotlinx.coroutines.flow.*

launchIn and collect are the two most common ways to subscribe to a flow. They are both terminal. "Terminal" doesn't mean it ends consuming...it means it starts consuming! A "nonterminal" function is one that wraps a Flow in another Flow without starting to collect it.

"Never complete normally" means that the code following it in the coroutine will not be reached. collect subscribes to a flow and suspends the coroutine until the flow is complete. Since a SharedFlow never completes, it "never completes normally".

It's hard to comment on your code because it's unusual to start your flow and collect it within the same function. Typically a SharedFlow would be exposed as a property for use by other functions. By combining it all into a single function you're hiding the fact that typically a SharedFlow is possibly publishing from a different coroutine scope than its being collected from.

Here's an example partially adapted from your code:

class Publisher {
    private val publishingScope = CoroutineScope(SupervisorJob())

    val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 0,
        onBufferOverflow = BufferOverflow.SUSPEND
    ).also { flow ->
        // emit messages
        publishingScope.launch {
            repeat(100000) {
                println("emitting $it")
                flow.emit(it)
                delay(500)
            }
        }
    }
}

fun main() {
    val publisher = Publisher()
    runBlocking {
        val subscribingScope = CoroutineScope(SupervisorJob())

        // Delay a while. We'll miss the first couple messages.
        delay(1300)

        // Subscribe to the shared flow
        subscribingScope.launch {
            publisher.messagesFlow.collect { println("I am colllecting message $it") }
            // Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
        }

        delay(3000) // Keep app alive for a while
    }
}

Since collect typically prevents any code below it from running in the coroutine, the launchIn function can make it a little more obvious what's happening, and more concise:

fun main() {
    val publisher = Publisher()
    runBlocking {
        val subscribingScope = CoroutineScope(SupervisorJob())

        delay(1300)
        
        publisher.messagesFlow.onEach { println("I am colllecting message $it") }
            .launchIn(subscribingScope)

        delay(3000)
    }
}

Upvotes: 5

Related Questions