viad899
viad899

Reputation: 43

kotlin coroutine, periodically emit data and check subscriber count

I have server with a rSocket service on the spring boot:

@MessageMapping("get-messages")
fun getMessageById(): Flow<Set<Message>> {
    return flow { emit(service.getLatestMessages()) }
}

Because the repo is not reactive, i want periodically go to the database for data and give it to subscribers, if exist.

I want use StateFlow like this:

private val stateFlowMessages = MutableStateFlow<Set<Message>>(emptySet())
init {
    CoroutineScope(Dispatchers.IO).launch {
        while(true){
            if (stateFlowProducts.subscriptionCount.value > 0) 
                stateFlowProducts.value = service.getLatestMessages()
            delay(6 * 1000)
        }
    }
}

but subscribers always 0 and i think "while" with "delay" not a best practice?

Upvotes: 1

Views: 1275

Answers (1)

0. subscriptionCount: `0       1      2     0     2`
1. Map to true/false: `false  true  true false true`
2. Distinct.        : `false  true       false true`
3. Filter.          : `       true             true`
3. MapLatest.       : `         list             list`.
stateFlowProducts.subscriptionCount
 .map { it > 0 }
 .distinctUntilChanged()
 .filter { it }
 .mapLatest { service.getLatestMessages() }
 .onEach { stateFlowProducts.value = it }
 .launchIn(scope)

https://pl.kotl.in/nQBp0zW6s

Upvotes: 2

Related Questions