Reputation: 428
I am trying to run a 'batch' job of Firebase Firestore. Since batch job is asynchronous, and every batch processes only 500 documents, I have created an array of batch jobs that I want to run in a synchronous way, so that I know exactly when the last batch is done, and then proceed with the next operation.
However, to do this in Kotlin, when I read up, I have come across a sleuth of jargons like runBlocking, Coroutine, Dispatcher, async, await, Context, Suspend, launch, join, Scope, Deferred, Continuation, CommonPool
Furthermore, many of the posts say that in the latest version of Kotlin things have changed. Kotlin documentation talks about runBlocking, but this post says runBlocking is bad thing.
After some trial and error I got this to get compiled
suspend fun doTheThing() {
for ( b in batchArray) {
coroutineScope {
val job = async { b.commit() }
job.await()
}}
}
However, now I am getting the error saying "Suspend function 'doTheThing' should be called only from a coroutine or another suspend function" I am just confused at the moment. I just want to do those calls in sequence, or wait till all of those are done. Not sure what is the right syntax to get this done, and what concepts am I getting wrong.
Update: Following code snippet seems to be working:
for ( b in batchArray) {
runBlocking {b.commit()}
}
Is it a good practice to do it like this?
Upvotes: 3
Views: 7203
Reputation: 621
Please see the solution below where your main function from which you are starting you batch job operation you need to define on which thread all 500 documents will process. So you initialise a Coroutine scope with IO dispatcher.And call you main processing method inside it.
There are three dispatchers in Kotlin
Now, since you want all 500 documents to process in parallel so you create a synchronous block inside that background thread. This synchronous block will not be completed unless all the asynchronous blocks (.commit) operation is not completed.
I guess in this way you can achieve the behavior you want. Please see the code for same below:
fun initialFunction() {
//inside this function start the Coroutine using launch
//using Dispatcher.IO will perform execution of coroutine in background/IO
CoroutineScope(Dispatchers.IO).launch {
//call your method which will process batch job asynchronously
doTheThing()
}
}
suspend fun doTheThing() {
//now start your blocking call, this execute following block
//synchronously
runBlocking {
for ( b in batchArray) {
//commit will run synchronously and following nested coroutine
//will wait for job to get completed
launch {
val job = async { b.commit() }
job.await()
}
}
}
}
Upvotes: 0
Reputation: 200168
Is it a good practice to do it like this?
No, runBlocking
is definitely the wrong thing to do. It will block the main thread of your application and possibly crash it with an ANR. However, the particular way you wrote the code means that you can just as well remove runBlocking
and get the exact same behavior. b.commit()
is a plain async call, it immediately returns a Task
object, which means you haven't achieved your desired goal of waiting for a batch to complete before submitting the next one.
Now, on to a correct solution that leverages coroutines.
Put the org.jetbrains.kotlinx:kotlinx-coroutines-play-services
dependency on the classpath. This will give you the suspend fun Task.await()
extension function and allow you to construct a suspending call b.commit().await()
, which doesn't complete until the batch is committed.
With that in place you can write your function like this:
fun CoroutineScope.doTheThing(batchArray: List<Batch>) {
launch {
for (b in batchArray) {
b.commit().await()
}
// add logic here for what to do when all batches are done
}
}
In order to call it, you need a CoroutineScope
. If you don't know yet about structured concurrency and how to use it, take a look at the doc of CoroutineScope for a quick start.
Note that the caller of submitAll
won't block until all the batches are done, it will instead launch a coroutine in the background and continue. The launched coroutine will, however, suspend while a batch is in progress, resume when it's done, start the next job, suspend, and so on until all are done. While it's suspended, it won't occupy an thread.
Upvotes: 1
Reputation: 528
Coroutines are usually created by different builders in a context of some coroutine scope. Like the builders, a suspend function runs in a coroutine scope and as such should be called within a coroutine scope which can be provided either by calling within a coroutine, a suspend function or explicitly from a defined scope.
A CoroutineScope is an interface which contains a single property only which is the coroutineContext. You can simply create your own scope by implementing the CoroutineScope interface and override your own coroutine context.
val myCoroutineScope = object : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Main
}
From your scope, you can use builders like launch, async, produce etc.
You can refactor your function to
suspend fun doTheThing() = coroutineScope{
for ( b in batchArray) {
b.commit()
}
}
fun main(args: Array<String>) {
myCoroutineScope.launch {
doTheThing()
println("Completed")
}
}
I used launch here since we don't really care about the results. The suspend function will suspend the parent coroutinescope until it completes its execution.
You can also choose to run your scope in another dispatcher
fun main(args: Array<String>) {
myCoroutineScope.launch(Dispatchers.IO) {
doTheThing()
println("Completed")
}
}
For better results where we don't want cancellation of any child coroutine to cancel the scope we use a SuperVisor Job in place of the regular Job.
val myCoroutineScope = object : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = SupervisorJob() + Dispatchers.Default
}
Upvotes: 2