Reputation: 14464
I have a following class which I hold as a singleton:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
In activity I observe the session and perform network operation on each change:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
When I subscribe to the session store, the subject emits on io()
immediatelly as it is a BehaviourSubject
and subscriber performs on mainThread()
.
The issue comes when I call sessionStore.set(new AnotherSession())
while already subscribed to it. IMO this should execute stream defined in init()
on the io()
scheduler. However what happens instead is that the stream executes on the same thread the subject.onNext()
was called on. Resulting into NetworkOnMainThreadException
as I am doing an network operation in flatMap()
.
Do I understand subjects wrong? Do I misuse them this way? What is the proper solution then please?
I've also tried to replace the whole subject approach with Observable.fromEmitter()
in observe()
method, but suprisingly the output was the very same.
Upvotes: 3
Views: 2352
Reputation: 3083
I think you are forgetting that your Subject
is also an observer so in order to get the onNext
to run on an io thread try
public class SessionStore {
Subject<Session, Session> subject;
public UserSessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
Upvotes: 0
Reputation: 4012
please have a look at the following part from the book 'Reactive Programming with RxJava'
By default calling onNext() on a Subject is directly propagated to all Observer's onNext() callback methods. It is not a surprise that these methods share the same name. In a way, calling onNext() on Subject indirectly invokes onNext() on each and every Subscriber.
Lets recap: If you call onNext on a Subject from Thread-1, it will invoke onNext to subscriber from Thread-1. onSubscribe will be discared.
So first things first: On which Thread will the subscription happen for:
retrofitService.getAThing()
I will just guess, and say it is the invoking thread. Which would be the thread described in observeOn, which is the Android-UI-Loop.
Every value under observeOn will be shifted from Thread-a to Thread-b as specified by the scheduler. The observeOn should on the UI-Loop should happen right before the subscription. Every value which will be received in the subscription would be on the UI-Loop, which will not block the UI thread or end in an exception.
Pease have a look at the example code and the output:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
Output::
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Upvotes: 7
Reputation: 20278
When you call the operator it affects the whole downstream. If you call:
.observeOn(AndroidSchedulers.mainThread())
in the incorrect place, the rest of the stream is executed on the specified thread.
I suggest you to always add:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
at the very end of the stream:
private Subscription init() {
return sessionStore
.observe()
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
}
Upvotes: 1