Héctor Valls
Héctor Valls

Reputation: 26084

PublishSubject subscriber is not receiving events

I have a class ViewModel that exposes a PublishSubject binder.

ViewModel

public class ViewModel {

    private PublishSubject<ActionsEvent> binder = PublishSubject.create();
    private Service service = createService();

    @Override
    public Observable<ActionsEvent> getBinder() {
        return binder.doOnSubscribe(initialize());
    }

    private Action0 initialize() {
        return new Action0() {
            @Override
            public void call() {
                service.getActions().subscribe(new Action1<Action>() {
                    @Override
                    public void call(Action action) {
                        Log.d(TAG, "So far, so good");
                        binder.onNext(new ActionFetchedEvent(action));
                    }
                });
            }
        };
    }

}

And in the Activity, it subscribe an action to be executed when each event is fetched.

Activity

public class MyActivity extends Activity {

    @Override
    public void onCreate(Bundle savedInstance) {
        //More code
        viewModel.getBinder().subscribe(new Action1<ActionsEvent>() {
            @Override
            public void call(ActionsEvent event) {
                Log.d(TAG, "This is not printed!!");
                paintActionInUserInterface(event.getAction());
            }
        });
    }
}

Service

public interface ActionsService {
    @GET("/actions")
    Observable<Action> getActions(); //Performs an HTTP request with Retrofit
}

ActionFetchedEvent

public class ActionFetchedEvent implements ActionsEvent {

    private Action action;

    //getters and setters

}

But subscriber doesn't receive the event. Why?

Upvotes: 1

Views: 2392

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4012

it is because you do not create an Subject with .create() factory-method, and onSubscribe will be called before the callback of your subscription, so you will subscribe too late and miss the element. You could use a BahaviourSubject, which will replay the last element, if you subscribe.

Could you please tell us what you want to achieve, because I think you could compose the observables in a way better way, than subscribing and posting onNext onto the subject.

Please have a look at my example. I use RxJava2 as environment.

public class ViewModelTest {
    class ActionsEvent {
    }

    class ActionFetchedEvent extends ActionsEvent {
        public ActionFetchedEvent(ActionsEvent actionEvent) {

        }
    }

    interface Service {
        public Observable<ActionsEvent> getActions();
    }

    class MyViewModel {
        private BehaviorSubject<ActionsEvent> binder;

        private Service service;

        public MyViewModel(Service service) {
            this.service = service;
            this.binder = BehaviorSubject.create();
        }

        public Observable<ActionsEvent> getBinder() {
            return binder.doOnSubscribe(disposable -> {
                service.getActions().subscribe(action -> {
                            binder.onNext(new ActionFetchedEvent(action));
                        }
                );
            });
        }
    }

    @Test
    public void name() throws Exception {
        Service mock = mock(Service.class);

        MyViewModel viewModel = new MyViewModel(mock);

        when(mock.getActions()).thenAnswer(invocation -> {
            return Observable.just(new ActionsEvent());
        });

        TestObserver<ActionsEvent> test = viewModel.getBinder().test();

        test.assertValueCount(1);
    }
}

Upvotes: 2

Related Questions