Reputation: 4456
In an Android project that uses RxJava 2, I create a Flowable
like this in the onCreate
of my initial activity:
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
The implementation of the FlowableOnSubscribe is:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
This is the subscriber implementation:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
And the action implementation is:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
In my output, I'm expecting to a log statement from onNext
, but I don't see one. Instead, this is my entire output:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
This indicates that onNext
never runs, and onComplete
doesn't even run either. But MyAction
runs successfully.
Here's what happens when I comment out the call to onNext
:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
In this case onNext
of course doesn't run, but at least onComplete
runs.
I expected that I would see onComplete
run in both cases, and onNext
run when I call emitter.onNext
. What am I doing wrong here?
Upvotes: 7
Views: 3567
Reputation: 69997
You need to manually issue a request otherwise no data will be emitted when extending Subscriber
directly:
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
Alternatively, you could extend DisposableSubscriber
or ResourceSubscriber
.
Upvotes: 18
Reputation: 94
How are you testing it? Is it possible your main thread exits before the Observable has a chance to emit the result because you are using Schedulers.IO
for the subscribe thread. Also, your observeOn
will not do anything as it is only used for further downstream operators.
Upvotes: 0