Anykey Skylocker
Anykey Skylocker

Reputation: 231

Kotlin Coroutines sequential execution

I'm trying to create an object which can execute some tasks sequentially in its own thread like it is a queue.

The following sample is just for demonstrating my setup and may be completely wrong.

class CoroutinesTest {
    fun a() {
        GlobalScope.launch {
            println("a started")
            delay(1000)
            println("a completed")
        }
    }

    fun b() {
        GlobalScope.launch {
            println("b started")
            delay(2000)
            println("b completed")
        }
    }

    fun complex() {
        a()
        b()
    }
}

fun main() {
    runBlocking {
        val coroutinesTest = CoroutinesTest()

        coroutinesTest.complex()

        delay(10000)
    }
}

For now this code prints the following

a started
b started
a completed
b completed

which means a and b executed in parallel. Methods a, b and complex can be called from different threads. Of course, the complex method should also support this concept. For now, I need a mechanism that allows me to execute only one task at a moment, so I could get the following output:

a started
a completed
b started
b completed

I did some research and think that actor with a Channel can do what needed, but actor for now is marked as obsolete (issue #87). I don't like the idea of using API that is subject to change, so I would like to do the thing in a common way.

Upvotes: 21

Views: 13030

Answers (3)

Tyler V
Tyler V

Reputation: 10940

TL;DR There are a few options for controlling sequential coroutines.

  1. Use a Channel to make them run one at a time in the order called
  2. Use a Mutex to make them run one at a time but without a guarantee of order
  3. Use a Flow (as described in the answer below by BigSt) to make them run one at a time in the order called, however make sure that the flow buffer is large enough or jobs can be lost if the number of jobs "in flight" is larger than the buffer size.
  4. If the desired sequence is always the same, put the actual work into suspend functions and call the sequence from within the same coroutine scope to make them run one at a time in the order prescribed by the code

Channel

One way to control execution order is to use a Channel - where lazily executed coroutine jobs are passed to the channel to be run in sequence. Unlike the Mutex, the Channel guarantees that the jobs are run in the order they are launched.

class CoroutinesTest {

    private val channel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
        GlobalScope.launch {
            consumeEach { it.join() }
        }
    }

    fun a() {
        channel.trySend(
            GlobalScope.launch(start = CoroutineStart.LAZY) {
                println("a started")
                delay(1000)
                println("a completed")
            }
        )
    }

    fun b() {
        channel.trySend(
            GlobalScope.launch(start = CoroutineStart.LAZY) {
                println("b started")
                delay(2000)
                println("b completed")
            }
        )
    }

    fun complex() {
        // add two separate jobs to the channel,
        // this will run a, then b
        a()
        b()
    }
}

Calling complex always produces:

a started
a completed
b started
b completed

Mutex

You can keep jobs from running at the same time with a Mutex and withLock call. The call order is not guaranteed if you make a bunch of calls in short succession. For example:

class CoroutinesTest {
    private val lock = Mutex()
    
    fun a() {
        GlobalScope.launch {
            lock.withLock {
                println("a started")
                delay(1000)
                println("a completed")
            }
        }
    }

    fun b() {
        GlobalScope.launch {
            lock.withLock {
                println("b started")
                delay(2000)
                println("b completed")
            }
        }
    }

    fun complex() {
        a()
        b()
    }
}

Calling complex can produce:

a started
a completed
b started
b completed

or:

b started
b completed
a started
a completed

Suspend Functions

If you must always run a then b you can make both of them suspend functions and call them from within a single scope (only allowing the complex call, not individual a and b calls). In this case, the complex call does guarantee that a runs and completes before starting b.

class CoroutinesTest {
    
    suspend fun aImpl() {
        println("a started")
        delay(1000)
        println("a completed")
    }

    suspend fun bImpl() {
        println("b started")
        delay(2000)
        println("b completed")
    }

    fun complex() {
        GlobalScope.launch {
            aImpl()
            bImpl()
        }
    }
}

Calling complex always produces:

a started
a completed
b started
b completed

Upvotes: 30

Sergio
Sergio

Reputation: 30715

Flows are sequential, using MutableSharedFlow it can be achieved like the following:

class CoroutinesTest {

    // make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
    // and extraBufferCapacity are large enough to handle all the jobs. 
    // In case some jobs are lost try to increase either of the values.
    private val sharedFlow = MutableSharedFlow<Job>(replay = 2, extraBufferCapacity = 2)

    init {
        sharedFlow.onEach { job ->
            job.join()
        }.launchIn(GlobalScope)
    }

    
    fun a() {
        // emit job to the Flow to execute sequentially
        sharedFlow.tryEmit(
            // using CoroutineStart.LAZY here to start a coroutine when join() is called
            GlobalScope.launch(start = CoroutineStart.LAZY) {
                println("a started")
                delay(1000)
                println("a completed")
            }
        )
    }

    fun b() {
        // emit job to the Flow to execute sequentially
        sharedFlow.tryEmit(
            // using CoroutineStart.LAZY here to start a coroutine when join() is called
            GlobalScope.launch(start = CoroutineStart.LAZY) {
                println("b started")
                delay(2000)
                println("b completed")
            }
        )
    }


    fun complex() {
        a()
        b()
    }
}

Note: GlobalScope is not recommended to use, it violates the principle of structured concurrency.

Upvotes: 2

Emanuel Moecklin
Emanuel Moecklin

Reputation: 28866

Old question but here's a simpler approach anyway. Change a() to return the Coroutine job:

fun a() = GlobalScope.launch {
    println("a started")
    delay(1000)
    println("a completed")
}

Then you can invoke a() / b() like this:

a().invokeOnCompletion { b() }

This way b() won't be triggered before a() terminates.

Alternatively you can use join:

fun complex() {
    GlobalScope.launch {
        a().join()
        b()
    }
}

Upvotes: 3

Related Questions