Reputation: 211
I need to process all of the results from a paged API endpoint. I'd like to present all of the results as a sequence.
I've come up with the following (slightly psuedo-coded):
suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
var currentRequest: Request? = client.requestForNextPage()
return withContext(Dispatchers.IO) {
sequence {
while(currentRequest != null) {
var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
currentRequest = client.requestForNextPage()
yieldAll(rowsInPage)
}
}
}
}
This functions but I'm not sure about a couple of things:
runBlocking
still happening with the IO dispatcher?Upvotes: 2
Views: 2050
Reputation: 211
I happened across suspendingSequence
in Kotlin's coroutines-examples
:
This is exactly what I was looking for.
Upvotes: 1
Reputation: 3890
Sequences are definitely not the thing you want to use in this case, because they are not designed to work in asynchronous environment. Perhaps you should take a look at flows and channels, but for your case the best and simplest choice is just a collection of deferred values, because you want to process all requests at once (flows and channels process them one-by-one, maybe with limited buffer size).
The following approach allows you to start all requests asynchronously (assuming that makeRequest
is suspended function and supports asynchronous requests). When you'll need your results, you'll need to wait only for the slowest request to finish.
fun getClientRequests(client: Client): List<Request> {
val requests = ArrayList<Request>()
var currentRequest: Request? = client.requestForNextPage()
while (currentRequest != null) {
requests += currentRequest
currentRequest = client.requestForNextPage()
}
return requests
}
// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
getClientRequests(client).map {
/*
* The better practice would be making getAllRowsFromApi an extension function
* to CoroutineScope and calling receiver scope's async function.
* GlobalScope is used here just for simplicity.
*/
GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
}
fun main() {
val client = Client()
val deferredPages = getAllRowsFromAPI(client) // This line executes fast
// Here you can do whatever you want, all requests are processed in background
Thread.sleep(999L)
// Then, when we need results....
val pages = runBlocking {
deferredPages.map { it.await() }
}
println(pages)
// In your case you also want to "unpack" pages and get rows, you can do it here:
val rows = pages.flatMap { it.getRows() }
println(rows)
}
Upvotes: 0
Reputation: 28056
Question 1: The API-request will still run on the IO-dispatcher, but it will block the thread it's running on. This means that no other tasks can be scheduled on that thread while waiting for the request to finish. There's not really any reason to use runBlocking
in production-code at all, because:
makeRequest
is already a blocking call, then runBlocking
will do practically nothing.makeRequest
was a suspending call, then runBlocking
would make the code less efficient. It wouldn't yield the thread back to the pool while waiting for the request to finish.Whether makeRequest
is a blocking or non-blocking call depends on the client you're using. Here's a non-blocking http-client I can recommend: https://ktor.io/clients/
Question 2: I would use a Flow
for this purpose. You can think of it as a suspendable variant of Sequence
. Flows are cold, which means that it won't run before the consumer asks for its contents (in contrary to being hot, which means the producer will push new values no matter if the consumer wants it or not). A Kotlin Flow
has an operator called buffer
which you can use to make it request more pages before it has fully consumed the previous page.
The code could look quite similar to what you already have:
suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
var currentRequest: Request? = client.requestForNextPage()
while(currentRequest != null) {
val rowsInPage = client.makeRequest(currentRequest)
emitAll(rowsInPage.asFlow())
currentRequest = client.requestForNextPage()
}
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)
The capacity of 1 means that will only make 1 more request while processing an earlier page. You could increase the buffer size to make more concurrent requests. You should check out this talk from KotlinConf 2019 to learn more about flows: https://www.youtube.com/watch?v=tYcqn48SMT8
Upvotes: 3