Niclas
Niclas

Reputation: 539

Is there a way to achieve this rx flow in Kotlin with coroutines/Flow/Channels?

I am trying out Kotlin Coroutines and Flow for the first time and I am trying to reproduce a certain flow I use on Android with RxJava with an MVI-ish approach, but I am having difficulties getting it right and I am essentially stuck at this point.

The RxJava app looks essentially like this:

MainActivityView.kt

object MainActivityView {

    sealed class Event {
        object OnViewInitialised : Event()
    }

    data class State(
        val renderEvent: RenderEvent = RenderEvent.None
    )

    sealed class RenderEvent {
        object None : RenderEvent()
        class DisplayText(val text: String) : RenderEvent()
    }
}

MainActivity.kt

MainActivity has an instance of a PublishSubject with a Event type. Ie MainActivityView.Event.OnViewInitialised, MainActivityView.Event.OnError etc. The initial Event is sent in onCreate() via the subjects's .onNext(Event) call.

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedSubject: PublishSubject<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setupEvents()
    }

    override fun onDestroy() {
        super.onDestroy()
        subscriptions.clear()
    }

    private fun setupEvents() {
        if (subscriptions.size() == 0) {
            Observable.mergeArray(
                onViewInitialisedSubject
                    .toFlowable(BackpressureStrategy.BUFFER)
                    .toObservable()
            ).observeOn(
                Schedulers.io()
            ).compose(
                viewModel()
            ).observeOn(
                AndroidSchedulers.mainThread()
            ).subscribe(
                ::render
            ).addTo(
                subscriptions
            )

            onViewInitialisedSubject
                .onNext(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        }
    }

    private fun render(state: MainActivityView.State) {
        when (state.renderEvent) {
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> {
                mainActivityTextField.text = state.renderEvent.text
            }
        }
    }

}

MainActivityViewModel.kt

These Event's are then picked up by a MainActivityViewModel class which is invoked by .compose(viewModel()) which then transform the received Event into a sort of a new State via ObservableTransformer<Event, State>. The viewmodel returns a new state with a renderEvent in it, which can then be acted upon in the MainActivity again via render(state: MainActivityView.State)function.

@MainActivityScope
class MainActivityViewModel @Inject constructor(
    private var state: MainActivityView.State
) {

    operator fun invoke(): ObservableTransformer<MainActivityView.Event, MainActivityView.State> = onEvent

    private val onEvent = ObservableTransformer<MainActivityView.Event,
        MainActivityView.State> { upstream: Observable<MainActivityView.Event> ->
        upstream.publish { shared: Observable<MainActivityView.Event> ->
            Observable.mergeArray(
                shared.ofType(MainActivityView.Event.OnViewInitialised::class.java)
            ).compose(
                eventToViewState
            )
        }
    }

    private val eventToViewState = ObservableTransformer<MainActivityView.Event, MainActivityView.State> { upstream ->
        upstream.flatMap { event ->
            when (event) {
                MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            }
        }
    }

    private fun onViewInitialisedEvent(): Observable<MainActivityView.State> {
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        return state.asObservable()
    }

}

Could I achieve sort of the same flow with coroutines/Flow/Channels? Possibly a bit simplified even?

EDIT:

I have since found a solution that works for me, I haven't found any issues thus far. However this solution uses ConflatedBroadcastChannel<T> which eventually will be deprecated, it will likely be possible to replace it with (at the time of writing) not yet released SharedFlow api (more on that here.

The way it works is that the Activity and viewmodel shares a ConflatedBroadcastChannel<MainActivity.Event> which is used to send or offer events from the Activity (or an adapter). The viewmodel reduce the event to a new State which is then emitted. The Activity is collecting on the Flow<State> returned by viewModel.invoke(), and ultimately renders the emitted State.

MainActivityView.kt

object MainActivityView {

    sealed class Event {
        object OnViewInitialised : Event()
        data class OnButtonClicked(val idOfItemClicked: Int) : Event()
    }

    data class State(
        val renderEvent: RenderEvent = RenderEvent.Idle
    )

    sealed class RenderEvent {
        object Idle : RenderEvent()
        data class DisplayText(val text: String) : RenderEvent()
    }
}

MainActivity.kt

class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>

    private var isInitialised: Boolean = false

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        init()
    }
    
    private fun init() {
        if (!isInitialised) {
            
            lifecycleScope.launch {
                viewModel()
                    .flowOn(
                        Dispatchers.IO
                    ).collect(::render)
            }

            eventChannel
                .offer(
                    MainActivityView.Event.OnViewInitialised
                )
            isInitialised = true
        }
    }

    private suspend fun render(state: MainActivityView.State): Unit =
        when (state.renderEvent) {
            MainActivityView.RenderEvent.Idle -> Unit
            is MainActivityView.RenderEvent.DisplayText -> 
                renderDisplayText(text = state.renderEvent.text)
            
        }

    private val renderDisplayText(text: String) {
        // render text
    }

}

MainActivityViewModel.kt

class MainActivityViewModel constructor(
    private var state: MainActivityView.State = MainActivityView.State(),
    private val eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>,
 ) {

    suspend fun invoke(): Flow<MainActivityView.State> =
        eventChannel
            .asFlow()
            .flatMapLatest { event: MainActivityView.Event ->
                reduce(event)
            }

    private fun reduce(event: MainActivityView.Event): Flow<MainActivityView.State> =
        when (event) {
            MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            MainActivityView.Event.OnButtonClicked -> onButtonClickedEvent(event.idOfItemClicked)
        }

    private fun onViewInitialisedEvent(): Flow<MainActivityView.State> = flow 
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        emit(state)
    }

    private fun onButtonClickedEvent(idOfItemClicked: Int): Flow<MainActivityView.State> = flow 
        // do something to handle click
        println("item clicked: $idOfItemClicked")
        emit(state)
    }

}

Similiar questions:

Upvotes: 5

Views: 3450

Answers (3)

TrevJonez
TrevJonez

Reputation: 959

kotlinx-coroutines-core provides a transform function.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html

it isn't quite the same as what we are used to in RxJava but should be usable for achieving the same result.

Upvotes: 0

luis_cortes
luis_cortes

Reputation: 736

I think what you're looking for is the Flow version of compose and ObservableTransformer and as far as I can tell there isn't one. What you can use instead is the let operator and do something like this:

MainActivity:

yourFlow
  .let(viewModel::invoke)
  .onEach(::render)
  .launchIn(lifecycleScope) // or viewLifecycleOwner.lifecycleScope if you're in a fragment

ViewModel:

operator fun invoke(viewEventFlow: Flow<Event>): Flow<State> = viewEventFlow.flatMapLatest { event ->
  when (event) {
    Event.OnViewInitialised -> flowOf(onViewInitialisedEvent())
  }
}

As far as sharing a flow I would watch these issues:

Dominic's answer might work for replacing the publish subjects but I think the coroutines team is moving away from BroadcastChannel and intends to deprecate it in the near future.

Upvotes: 0

Dominic Fischer
Dominic Fischer

Reputation: 1849

Your MainActivity can look something like this.

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedChannel: BroadcastChannel<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setupEvents()
    }

    override fun onDestroy() {
        super.onDestroy()
        subscriptions.clear()
    }

    private fun setupEvents() {
        if (subscriptions.size() == 0) {
            onViewInitialisedChannel.asFlow()
                .buffer()
                .flowOn(Dispatchers.IO)
                .onEach(::render)
                .launchIn(GlobalScope)

            onViewInitialisedChannel
                .offer(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        }
    }

    private fun render(state: MainActivityView.State) {
        when (state.renderEvent) {
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> {
                mainActivityTextField.text = state.renderEvent.text
            }
        }
    }

}

Upvotes: 1

Related Questions