Reputation: 788
Using first version of RxJava
and RxAndroid
I had following class
as EventBus
:
public class RxBus {
private static RxBus instance;
private PublishSubject<Object> subject = PublishSubject.create();
public static RxBus instanceOf() {
if (instance == null) {
instance = new RxBus();
}
return instance;
}
public void setMessage(Object object) {
subject.onNext(object);
}
public Observable<Object> getEvents() {
return subject;
}
}
Getting instance via instanceOf
in any class I used setMessage
method to emit messages and following code to get emitted messages:
bus.getEvents().subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
if (o instanceof String) {
//TODO
}
}
});
Action1
was from rx.functions
package. Trying to migrate use RxJava 2
I cannot import it.
Tell me please, what is the shortest way to use RxJava 2
as EventBus
Upvotes: 5
Views: 1215
Reputation: 2249
Here's a good implementation of event bus in RxJava2 (Code is copied from this gist)
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
public class RxBus {
private static volatile RxBus sRxBus = null;
private PublishSubject<Object> mPublishSubject = PublishSubject.create();
private RxBus() {
}
public static RxBus getInstance() {
if (sRxBus == null) {
synchronized (RxBus.class) {
if (sRxBus == null) {
sRxBus = new RxBus();
}
}
}
return sRxBus;
}
public <T> Observable<T> subscribe(Class<T> cls) {
return mPublishSubject
.filter(o -> o.getClass().equals(cls))
.map(o -> (T) o);
}
public void post(Object obj) {
mPublishSubject.onNext(obj);
}
}
Upvotes: 0
Reputation: 11137
In RxJava2, the Action1
has been renamed to Consumer
.
The remaining action interfaces were named according to the Java 8 functional types. The no argument
Action0
is replaced by theio.reactivex.functions.Action
for the operators andjava.lang.Runnable
for theScheduler
methods.Action1
has been renamed toConsumer
andAction2
is calledBiConsumer
.ActionN
is replaced by theConsumer<Object[]>
type declaration.
Upvotes: 6