Reputation: 43
I have server with a rSocket service on the spring boot:
@MessageMapping("get-messages")
fun getMessageById(): Flow<Set<Message>> {
return flow { emit(service.getLatestMessages()) }
}
Because the repo is not reactive, i want periodically go to the database for data and give it to subscribers, if exist.
I want use StateFlow like this:
private val stateFlowMessages = MutableStateFlow<Set<Message>>(emptySet())
init {
CoroutineScope(Dispatchers.IO).launch {
while(true){
if (stateFlowProducts.subscriptionCount.value > 0)
stateFlowProducts.value = service.getLatestMessages()
delay(6 * 1000)
}
}
}
but subscribers always 0 and i think "while" with "delay" not a best practice?
Upvotes: 1
Views: 1275
Reputation: 2719
0. subscriptionCount: `0 1 2 0 2`
1. Map to true/false: `false true true false true`
2. Distinct. : `false true false true`
3. Filter. : ` true true`
3. MapLatest. : ` list list`.
stateFlowProducts.subscriptionCount
.map { it > 0 }
.distinctUntilChanged()
.filter { it }
.mapLatest { service.getLatestMessages() }
.onEach { stateFlowProducts.value = it }
.launchIn(scope)
Upvotes: 2