Reputation: 231
I'm trying to create an object which can execute some tasks sequentially in its own thread like it is a queue.
The following sample is just for demonstrating my setup and may be completely wrong.
class CoroutinesTest {
fun a() {
GlobalScope.launch {
println("a started")
delay(1000)
println("a completed")
}
}
fun b() {
GlobalScope.launch {
println("b started")
delay(2000)
println("b completed")
}
}
fun complex() {
a()
b()
}
}
fun main() {
runBlocking {
val coroutinesTest = CoroutinesTest()
coroutinesTest.complex()
delay(10000)
}
}
For now this code prints the following
a started
b started
a completed
b completed
which means a
and b
executed in parallel. Methods a
, b
and complex
can be called from different threads. Of course, the complex
method should also support this concept. For now, I need a mechanism that allows me to execute only one task at a moment, so I could get the following output:
a started
a completed
b started
b completed
I did some research and think that actor
with a Channel
can do what needed, but actor
for now is marked as obsolete (issue #87). I don't like the idea of using API that is subject to change, so I would like to do the thing in a common way.
Upvotes: 21
Views: 13030
Reputation: 10940
TL;DR There are a few options for controlling sequential coroutines.
Channel
to make them run one at a time in the order calledMutex
to make them run one at a time but without a guarantee of orderFlow
(as described in the answer below by BigSt) to make them run one at a time in the order called, however make sure that the flow buffer is large enough or jobs can be lost if the number of jobs "in flight" is larger than the buffer size.One way to control execution order is to use a Channel - where lazily executed coroutine jobs are passed to the channel to be run in sequence. Unlike the Mutex, the Channel guarantees that the jobs are run in the order they are launched.
class CoroutinesTest {
private val channel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
GlobalScope.launch {
consumeEach { it.join() }
}
}
fun a() {
channel.trySend(
GlobalScope.launch(start = CoroutineStart.LAZY) {
println("a started")
delay(1000)
println("a completed")
}
)
}
fun b() {
channel.trySend(
GlobalScope.launch(start = CoroutineStart.LAZY) {
println("b started")
delay(2000)
println("b completed")
}
)
}
fun complex() {
// add two separate jobs to the channel,
// this will run a, then b
a()
b()
}
}
Calling complex
always produces:
a started
a completed
b started
b completed
You can keep jobs from running at the same time with a Mutex
and withLock
call. The call order is not guaranteed if you make a bunch of calls in short succession. For example:
class CoroutinesTest {
private val lock = Mutex()
fun a() {
GlobalScope.launch {
lock.withLock {
println("a started")
delay(1000)
println("a completed")
}
}
}
fun b() {
GlobalScope.launch {
lock.withLock {
println("b started")
delay(2000)
println("b completed")
}
}
}
fun complex() {
a()
b()
}
}
Calling complex
can produce:
a started
a completed
b started
b completed
or:
b started
b completed
a started
a completed
If you must always run a
then b
you can make both of them suspend functions and call them from within a single scope (only allowing the complex
call, not individual a
and b
calls). In this case, the complex
call does guarantee that a
runs and completes before starting b
.
class CoroutinesTest {
suspend fun aImpl() {
println("a started")
delay(1000)
println("a completed")
}
suspend fun bImpl() {
println("b started")
delay(2000)
println("b completed")
}
fun complex() {
GlobalScope.launch {
aImpl()
bImpl()
}
}
}
Calling complex
always produces:
a started
a completed
b started
b completed
Upvotes: 30
Reputation: 30715
Flow
s are sequential, using MutableSharedFlow
it can be achieved like the following:
class CoroutinesTest {
// make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
// and extraBufferCapacity are large enough to handle all the jobs.
// In case some jobs are lost try to increase either of the values.
private val sharedFlow = MutableSharedFlow<Job>(replay = 2, extraBufferCapacity = 2)
init {
sharedFlow.onEach { job ->
job.join()
}.launchIn(GlobalScope)
}
fun a() {
// emit job to the Flow to execute sequentially
sharedFlow.tryEmit(
// using CoroutineStart.LAZY here to start a coroutine when join() is called
GlobalScope.launch(start = CoroutineStart.LAZY) {
println("a started")
delay(1000)
println("a completed")
}
)
}
fun b() {
// emit job to the Flow to execute sequentially
sharedFlow.tryEmit(
// using CoroutineStart.LAZY here to start a coroutine when join() is called
GlobalScope.launch(start = CoroutineStart.LAZY) {
println("b started")
delay(2000)
println("b completed")
}
)
}
fun complex() {
a()
b()
}
}
Note: GlobalScope
is not recommended to use, it violates the principle of structured concurrency.
Upvotes: 2
Reputation: 28866
Old question but here's a simpler approach anyway. Change a() to return the Coroutine job:
fun a() = GlobalScope.launch {
println("a started")
delay(1000)
println("a completed")
}
Then you can invoke a() / b() like this:
a().invokeOnCompletion { b() }
This way b() won't be triggered before a() terminates.
Alternatively you can use join:
fun complex() {
GlobalScope.launch {
a().join()
b()
}
}
Upvotes: 3