Sigmund
Sigmund

Reputation: 788

RxJava Event Bus

Using first version of RxJava and RxAndroid I had following class as EventBus:

public class RxBus {
private static RxBus instance;
private PublishSubject<Object> subject = PublishSubject.create();

public static RxBus instanceOf() {
    if (instance == null) {
        instance = new RxBus();
    }
    return instance;
}

public void setMessage(Object object) {
    subject.onNext(object);
}

public Observable<Object> getEvents() {
    return subject;
}
}

Getting instance via instanceOf in any class I used setMessage method to emit messages and following code to get emitted messages:

  bus.getEvents().subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            if (o instanceof String) {
                //TODO
            }
        }
    });

Action1 was from rx.functions package. Trying to migrate use RxJava 2 I cannot import it.

Tell me please, what is the shortest way to use RxJava 2 as EventBus

Upvotes: 5

Views: 1215

Answers (2)

Abdallah Alaraby
Abdallah Alaraby

Reputation: 2249

Here's a good implementation of event bus in RxJava2 (Code is copied from this gist)

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

public class RxBus {
    private static volatile RxBus sRxBus = null;
    private PublishSubject<Object> mPublishSubject = PublishSubject.create();

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (sRxBus == null) {
            synchronized (RxBus.class) {
                if (sRxBus == null) {
                    sRxBus = new RxBus();
                }
            }
        }
        return sRxBus;
    }

    public <T> Observable<T> subscribe(Class<T> cls) {
        return mPublishSubject
                .filter(o -> o.getClass().equals(cls))
                .map(o -> (T) o);
    }

    public void post(Object obj) {
        mPublishSubject.onNext(obj);
    }
}

Upvotes: 0

Lamorak
Lamorak

Reputation: 11137

In RxJava2, the Action1 has been renamed to Consumer.

The remaining action interfaces were named according to the Java 8 functional types. The no argument Action0 is replaced by the io.reactivex.functions.Action for the operators and java.lang.Runnable for the Scheduler methods. Action1 has been renamed to Consumer and Action2 is called BiConsumer. ActionN is replaced by the Consumer<Object[]> type declaration.

See What's different in 2.0

Upvotes: 6

Related Questions