Omkar Amberkar
Omkar Amberkar

Reputation: 2452

How to create a polling mechanism with kotlin coroutines?

I am trying to create a polling mechanism with kotlin coroutines using sharedFlow and want to stop when there are no subscribers and active when there is at least one subscriber. My question is, is sharedFlow the right choice in this scenario or should I use channel. I tried using channelFlow but I am unaware how to close the channel (not cancel the job) outside the block body. Can someone help? Here's the snippet.

 fun poll(id: String) = channelFlow {
            while (!isClosedForSend) {
                try {
                    send(repository.getDetails(id))
                    delay(MIN_REFRESH_TIME_MS)
                } catch (throwable: Throwable) {
                    Timber.e("error -> ${throwable.message}")
                }
                invokeOnClose { Timber.e("channel flow closed.") }
        }
    } 

Upvotes: 4

Views: 4972

Answers (2)

J.Grbo
J.Grbo

Reputation: 475

You can use SharedFlow which emits values in a broadcast fashion (won't emit new value until the previous one is consumed by all the collectors).

val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()

scope.launch {
    val producer = launch() {
            sharedFlow.emit(...)
    }

    sharedFlow.subscriptionCount
              .map {count -> count > 0}
              .distinctUntilChanged()
              .collect { isActive -> if (isActive) stopProducing() else startProducing()
}

fun CoroutineScope.startProducing() {
    producer = launch() {
        sharedFlow.emit(...)
    }
        
}

fun stopProducing() {
    producer.cancel()
}

Upvotes: 2

R.h
R.h

Reputation: 70

First of all, when you call channelFlow(block), there is no need to close the channel manually. The channel will be closed automatically after the execution of block is done.

I think the "produce" coroutine builder function may be what you need. But unfortunately, it's still an experimental api.

fun poll(id: String) = someScope.produce {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val channel = poll("hello")

    channel.receive()

    channel.cancel()
}

The produce function will suspended when you don't call the returned channel's receive() method, so there is no need to delay.

UPDATE: Use broadcast for sharing values across multiple ReceiveChannel.

fun poll(id: String) = someScope.broadcast {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val broadcast = poll("hello")

    val channel1 = broadcast.openSubscription()
    val channel2 = broadcast.openSubscription()
    
    channel1.receive()
    channel2.receive()

    broadcast.cancel()
}

Upvotes: 0

Related Questions