Rezo Shalikashvili
Rezo Shalikashvili

Reputation: 89

Ensure sequential state update when using RXJava scan operator

I'm trying to implement redux state update pattern using RXJava

val subject=PublishSubject.create()
val subject1=PublishSubject.create()

// multiple threads posting 
// on subject and subject1 here. Concurrently 


subject.mergeWith(subject1)
       .scan(
             getInitState(),
             {state, event ->
               // state update here 
             }
         )
        .subscribe({state ->
          // use state here
        })

As you can see, I'm using scan operator to maintain the state.

How can I be sure that the state updates happen sequentially even when multiple threads are producing events?

Is there some mechanism in scan operator which makes the events stand in some queue while waiting for current state update function to finish?

What I have done:

I have successfully implemented this pattern in Android environment. It's really easy because if you always do the state update in

AndroidSchedulers.mainThread()

And make state object immutable you are guaranteed to have atomic and sequential state update. But what happens if you don't have dedicated scheduler for state updates? What if you are not on Android?

What I have researched:

Upvotes: 0

Views: 359

Answers (1)

tonicsoft
tonicsoft

Reputation: 1808

To force execution on a single thread, you can explicitly create a single thread scheduler to replace AndroidSchedulers.mainThread():

val singleThreadScheduler = Schedulers.single()

Even if the events are emitted on other threads, you can ensure you process them only on your single thread using observeOn:

subject.mergeWith(subject1)
   .observeOn(singleThreadScheduler)
   .scan(
         getInitState(),
         {state, event ->
           // state update here 
         }
     )
    .subscribe({state ->
      // use state here
    })  

The difference between observeOn and subscribeOn can be pretty confusing, and logging the thread id can be useful to check everything is running on the thread you expect.

http://reactivex.io/documentation/scheduler.html

Upvotes: 2

Related Questions