Switching threads multiple times in Rx chain

Let's assume I have a following case for Android:

  1. Request list of groups from network
  2. Show some UI elements for each group
  3. Request items for each group
  4. Show UI elemets for each item

I want to do this using RxJava:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));

As you can see I have 2 calls for view objects, each of them must be executed on main thread. And 2 calls for webService, which must be executed on background thread.

The problem with this code: first call to view will be executed on background which cause an Android RuntimeException (Only original thread may touch views or something) If I transfer .observeOn to the beginning of chain - second webService call will be executed in main thread.

How can I "swim" through threads multiple times in RxJava chain?

Upvotes: 15

Views: 8202

Answers (2)

javaxian
javaxian

Reputation: 2364

Building on Samuel's answer, you could do it with an even simpler, non-nested syntax:

webService.requestGroups()
.subscribeOn(Schedulers.io()) // the first operator (requestGroups) on the IO thread
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.map(group -> {
    view.showGroup(group);
    return group;
})
.observeOn(Schedulers.io()) //everything below on the IO thread
.flatMap(group -> {
    return webService.requestItems(group);
})
.toList()
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.subscribe(items -> view.showItems(items));

Two rules of thumb here:

  1. subscribeOn dictates on which thread the observable will begin executing, its placement in the chain is irrelevant and it should appear only once.
  2. observeOn tells on which thread all subsequent operators will execute (until another observeOn is encountered); it may appear multiple times in the chain, changing execution thread of different code pieces (like in the example above).

Upvotes: 14

Samuel Peter
Samuel Peter

Reputation: 4166

From the Rx doc for SubscribeOn:

The SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

The SubscribeOn operator can only be applied once and sets the starting thread. ObserveOn can be used to go from one thread to another at any point in the stream. So I think the following should do what you want:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(group -> {
        view.showGroup(group);
        return webService.requestItems(group)
                         .subscribeOn(Schedulers.io());
    })
    .toList()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(items -> view.showItems(items));

But in my opinion this is too complicated. I would just subscribe to the first observable, and then start a new chain for each group, like this:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(group -> { 
        view.showGroup(group);
        webService.requestItems(group)
            .subscribeOn(Schedulers.io()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(items -> view.showItems(items));
    });

Upvotes: 19

Related Questions