svkaka
svkaka

Reputation: 4022

Coroutines Channel values consumeEach waits instead of consuming once at the time

I am having a problem figuring out how to use Channels I expect them to push value to the consumer immediately after send is called, instead I am getting values after data from both sources are loaded.

MainActivity.kt

fun loadData() {
    textView.text = "LOADING"
    launch {
        repository.loadData().consumeEach { loaded ->
            withContext(Dispatchers.Main) {
                logd("Presenting: ${loaded.size}, $loaded")
                textView.text = loaded.joinToString { "$it\n" }
            }
        }
    }

Repository.kt

suspend fun loadData(): ReceiveChannel<List<String>> {
    return coroutineScope {
        produce(capacity = 2) {
            launch {
                val localData = local.loadData()
                send(localData)
            }
            launch {
                val remoteData = remote.loadData()
                send(remoteData)
            }
        }
    }
}

Remote.kt

override val data: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")

override suspend fun loadData(): List<String> {
    logd("Loading remote started")
    val wait = Random.nextLong(0, 500)
    delay(wait)
    logd("Remote loading took $wait")
    logd("Loading remote finished: ${data.size}, $data")
    return data
}

Local.kt

override val data: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")

override suspend fun loadData(): List<String> {
    logd("Loading local started")
    val wait = Random.nextLong(1000, 2000)
    delay(wait)
    logd("Local loading took $wait")
    logd("Loading local finished: ${data.size}, $data")
    return data
}

I get this inside console

D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]

This looks like data from both sources are emitted after capacity is reached. What I would expect it to do is that consumer can receive data right after they are sent. So console output looks more like this.

D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]

How do I achieve this (consuming values immediately after they are sent) using coroutine.Channel?


EDIT #1:

After removing coroutineScope{...} from Repository#loadData() it started working as expected. But now I have a problem that I have to pass scope as a function parameter which looks super ugly to me.

Repository.kt

suspend fun loadData(scope: CoroutineScope): ReceiveChannel<List<String>> {
    return scope.produce(capacity = 2) {
        launch {
            val localData = local.loadData()
            send(localData)
        }
        launch {
            val remoteData = remote.loadData()
            send(remoteData)
        }
        invokeOnClose {
            logd("Closing channel")
        }
    }
}

Upvotes: 3

Views: 1652

Answers (1)

Laurence
Laurence

Reputation: 1676

I think your code looks fine for doing what you expect it to do. I think the problem you have is that the logging is not arriving in your console at the time that it happens. Remember that logging itself has its own buffering and IO threads to travel through. I have tried your code and used println instead, and I get your expected behaviour. To confirm, you could instead of doing a random wait, increase the waits to 10s of seconds for each and really make them happen 1 after the other. Just to help you confirm this for yourself, here is my non android version of what you are trying to do:

    fun main() = runBlocking {
    val start = System.currentTimeMillis()
    launch(Dispatchers.Unconfined) {
        loadData().consumeEach { loaded ->
            println("Presenting: ${loaded.size}, $loaded")
        }
    }.join()
    println("The whole thing took ${System.currentTimeMillis() - start}")
}

suspend fun CoroutineScope.loadData() = produce {
    launch {
        val localData = localloadData()
        send(localData)
    }
    launch {
        val remoteData = remoteloadData()
        send(remoteData)
    }
}

val remoteData: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")

suspend fun remoteloadData(): List<String> {
    println("Loading remote started")
    val wait = 500L
    delay(wait)
    println("Remote loading took $wait")
    println("Loading remote finished: ${remoteData.size}, $remoteData")
    return remoteData
}

val localData: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")

suspend fun localloadData(): List<String> {
    println("Loading local started")
    val wait = 1000L
    delay(wait)
    println("Local loading took $wait")
    println("Loading local finished: ${localData.size}, $localData")
    return localData
}

And it produces this:

Loading local started
Loading remote started
Remote loading took 500
Loading remote finished: 5, [R1, R2, R3, R4, R5]
Presenting: 5, [R1, R2, R3, R4, R5]
Local loading took 1000
Loading local finished: 5, [L1, L2, L3, L4, L5]
Presenting: 5, [L1, L2, L3, L4, L5]
The whole thing took 1046

EDIT: I removed the withContext(Dispatchers.Main) that you had for updating your value - this is really not needed. You have already done the asynchronous work at this stage. Rather, you need to specify the context in your top launch, as this does now.

The rest of the work below this should inherit that context unless you specify otherwise. No need to keep passing the context as an argument.

If you do find yourself in a position where another context supersedes the inherited context then possibly passing it as an argument could be a way to do it, but my preference would be to find a work around and express it in a way which does inherit from the calling context.

Upvotes: 1

Related Questions