Peter
Peter

Reputation: 211

Kotlin wrap sequential IO calls as a Sequence

I need to process all of the results from a paged API endpoint. I'd like to present all of the results as a sequence.

I've come up with the following (slightly psuedo-coded):

suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
    var currentRequest: Request? = client.requestForNextPage()
    return withContext(Dispatchers.IO) {
         sequence {
            while(currentRequest != null) {
                var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
                currentRequest = client.requestForNextPage()
                yieldAll(rowsInPage)
            }
        }
     }
}

This functions but I'm not sure about a couple of things:

  1. Is the API request happening inside runBlocking still happening with the IO dispatcher?
  2. Is there a way to refactor the code to launch the next request before yielding the current results, then awaiting on it later?

Upvotes: 2

Views: 2050

Answers (3)

Peter
Peter

Reputation: 211

I happened across suspendingSequence in Kotlin's coroutines-examples:

https://github.com/Kotlin/coroutines-examples/blob/090469080a974b962f5debfab901954a58a6e46a/examples/suspendingSequence/suspendingSequence.kt

This is exactly what I was looking for.

Upvotes: 1

ardenit
ardenit

Reputation: 3890

Sequences are definitely not the thing you want to use in this case, because they are not designed to work in asynchronous environment. Perhaps you should take a look at flows and channels, but for your case the best and simplest choice is just a collection of deferred values, because you want to process all requests at once (flows and channels process them one-by-one, maybe with limited buffer size).

The following approach allows you to start all requests asynchronously (assuming that makeRequest is suspended function and supports asynchronous requests). When you'll need your results, you'll need to wait only for the slowest request to finish.

fun getClientRequests(client: Client): List<Request> {
    val requests = ArrayList<Request>()
    var currentRequest: Request? = client.requestForNextPage()
    while (currentRequest != null) {
        requests += currentRequest
        currentRequest = client.requestForNextPage()
    }
    return requests
}

// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
    getClientRequests(client).map {
        /* 
         * The better practice would be making getAllRowsFromApi an extension function
         * to CoroutineScope and calling receiver scope's async function.
         * GlobalScope is used here just for simplicity.
         */
        GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
    }

fun main() {
    val client = Client()
    val deferredPages = getAllRowsFromAPI(client) // This line executes fast
    // Here you can do whatever you want, all requests are processed in background
    Thread.sleep(999L)
    // Then, when we need results....
    val pages = runBlocking {
        deferredPages.map { it.await() }
    }
    println(pages)
    // In your case you also want to "unpack" pages and get rows, you can do it here:
    val rows = pages.flatMap { it.getRows() }
    println(rows)
}

Upvotes: 0

marstran
marstran

Reputation: 28056

Question 1: The API-request will still run on the IO-dispatcher, but it will block the thread it's running on. This means that no other tasks can be scheduled on that thread while waiting for the request to finish. There's not really any reason to use runBlocking in production-code at all, because:

  1. If makeRequest is already a blocking call, then runBlocking will do practically nothing.
  2. If makeRequest was a suspending call, then runBlocking would make the code less efficient. It wouldn't yield the thread back to the pool while waiting for the request to finish.

Whether makeRequest is a blocking or non-blocking call depends on the client you're using. Here's a non-blocking http-client I can recommend: https://ktor.io/clients/

Question 2: I would use a Flow for this purpose. You can think of it as a suspendable variant of Sequence. Flows are cold, which means that it won't run before the consumer asks for its contents (in contrary to being hot, which means the producer will push new values no matter if the consumer wants it or not). A Kotlin Flow has an operator called buffer which you can use to make it request more pages before it has fully consumed the previous page.

The code could look quite similar to what you already have:

suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
    var currentRequest: Request? = client.requestForNextPage()

    while(currentRequest != null) {
        val rowsInPage = client.makeRequest(currentRequest)
        emitAll(rowsInPage.asFlow())
        currentRequest = client.requestForNextPage()
    }
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)

The capacity of 1 means that will only make 1 more request while processing an earlier page. You could increase the buffer size to make more concurrent requests. You should check out this talk from KotlinConf 2019 to learn more about flows: https://www.youtube.com/watch?v=tYcqn48SMT8

Upvotes: 3

Related Questions