Víctor Albertos
Víctor Albertos

Reputation: 8293

How to unit test that a MutableSharedFlow<T>(replay=0) has emitted a value?

I'm not able to figure out how to test that a SharedFlow with replay=0 emitted a value.

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.runBlocking
import org.junit.Test

class ShowcaseTest {

    @Test
    fun testIntSharedFlowFlow() {
        val intSharedFlow = MutableSharedFlow<Int>()

        runBlocking {
            intSharedFlow.emit(1)
        }
        
        // Does not work as there is no buffer because MutableSharedFlow(replay=0)
        assert(intSharedFlow.replayCache.first() == 1)
    }
}

Upvotes: 22

Views: 13524

Answers (4)

Eric Cen
Eric Cen

Reputation: 4386

runBlockingTest is deprecated since 1.6.0, use runTest instead. the migration link

@Test
fun `test shared flow with deferred`() = runTest(UnconfinedTestDispatcher()) {
    val sharedFlow = MutableSharedFlow<Int>(replay = 0)

    val deferred = async {
        sharedFlow.first()
    }

    sharedFlow.emit(1)
   
    assertEquals(1, deferred.await())
}

Upvotes: 12

ycesar
ycesar

Reputation: 440

One solution would be to use the Deferred object returned by async

@Test
fun `test shared flow with deferred`() = runBlockingTest {
    val sharedFlow = MutableSharedFlow<Int>(replay = 0)

    val deferred = async {
        sharedFlow.first()
    }

    sharedFlow.emit(1)
   
    assertEquals(1, deferred.await())
}

Note: you must use runBlockingTest so async body is executed immediately so there is no chance that emit(1) is executed before first().

Upvotes: 8

Javier Ant&#243;n
Javier Ant&#243;n

Reputation: 665

If you want to test with replay=1, you can try to emit before "observing/collecting", so before the job started.

@Test
fun testIntSharedFlowFlow() = runBlockingTest{
    val _intSharedFlow = MutableSharedFlow<Int>()
    val intSharedFlow: SharedFlow = _intSharedFlow
    val testResults = mutableListOf<Int>()

    val job = launch {
        intSharedFlow.toList(testResults)
    }
    _intSharedFlow.emit(5) 
    
    assertEquals(1, testResults.size)
    assertEquals(5, testResults.first())
    job.cancel()
}

Don't forget to cancel the job, otherwhise the sharedFlow will continue collecting, and the test will give you an error or even loop forever.

Upvotes: 10

R&#243;bert Nagy
R&#243;bert Nagy

Reputation: 7622

You could use tryEmit() instead and verify the returned result

UPDATE:

Consider using Turbine For ex:

sharedFlow.test {
    sharedFlow.emit(1)
    assertEquals(expected = 1, expectItem())
    cancelAndIgnoreRemainingEvents()

}

Upvotes: 6

Related Questions