Reputation: 11741
I have a singleton that holds a reference to my database object. What I would like to do is constraint any database operation to a single IO thread.
First I tried with the following:
class SQLSingleton{
...
public Observable<MyObject> query(final String id){
return Observable.fromCallable(() -> {
//database operations here
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
...
}
The problem with this approach is that Schedulers.io() might give a different thread from its pool for each subscriber that subscribes to my observable above (and I don't want that, I want the code to be executed on the the same thread at all times).
I then moved to the approach of holding a Scheduler myself:
class SQLSingleton{
private final Scheduler schedulerIODatabase;
public SQLSingleton(){
schedulerIODatabase = Schedulers.newThread();
}
public Observable<MyObject> query(final String id){
return Observable.fromCallable(() -> {
//database operations here
})
.subscribeOn(schedulerIODatabase)
.observeOn(AndroidSchedulers.mainThread());
}
...
}
Since I am new to RxJava (and it looks like it has a lot of pitfalls), I ask: Is there any harm in keeping that Scheduler object alive (keep in mind that SQLSingleton is a singleton, therefore that Scheduler will also be)?
Upvotes: 2
Views: 1266
Reputation: 69997
schedulerIODatabase = Schedulers.newThread();
This has no effect because newThread
hands out a fresh thread every time it is applied with subscribeOn
, similar to io()
but without thread reuse.
You can't pin and reuse a particular RxJava thread in RxJava 1.x but you can do it in RxJava 2 with its extension library component: SharedScheduler.
In RxJava 1, you have to provide your own single-threaded ExecutorService
to Schedulers.from
which then will use that single Executor
for all worker invocations. Note that you have to manually manage your ExecutorService
's lifecycle and shut it down when your app has to terminate:
ExecutorService exec = Executors.newSingleThreadedExecutor();
Scheduler singleScheduler = Schedulers.from(exec);
Observable.fromCallable(() -> {
//database operations here
})
.subscribeOn(singleScheduler)
.observeOn(AndroidSchedulers.mainThread());
// ...
exec.shutdown();
Upvotes: 3