Reputation: 5752
I have an API available to clients that can be simplified to this:
public class API {
public void sendEvent(Event e);
}
Event
instances enter my system whenever a client calls the API (technically over Binder into a Service
derivative) which are then processed, filtered and dispatched to other internal components. I don't care about past events, just those available from the time a subscriber subscribes. It seems like a natural fit for the Rx paradigm which I'm just getting my feet wet with.
I need an Observable that is created once, allows multiple subscribers, and can be fed instances of Event
that are then sent through the reactive pipeline to observers. A Subject
seems appropriate for what I'm looking to do (in particular, this answer to this question resonated with me).
What do other RxJava users recommend?
Upvotes: 5
Views: 4410
Reputation: 12087
Try observable.share()
which under the covers calls .publish().refCount()
. It will use only one underlying subscription and give you the multiple subscription behaviour you specified.
Upvotes: 1
Reputation: 39386
For example, following on my short comment:
public class API implements OnSubscribe<Event> {
private List<Subscriber<Event>> subscribers = new ArrayList<>();
public void sendEvent(Event event) {
// Do whatever you need with the event
for (Subscriber<Event> sub : subscribers) {
sub.onNext(event);
}
}
public void call(Subscriber<Event> sub) {
subscribers.add(sub);
}
}
Then you probably have an instance somewhere: API api = ...
Your Observable is obtained like so: Observable.create(api);
You can then do any normal thing you would do with an Observable.
The filtering of the unsubscribed Subscriber
s is left as an exercise to the reader.
A little more research shows that PublishSubject
should help:
public class API {
private PublishSubject<Event> subject = PublishSubject.create();
public void sendEvent(Event event) {
// Do whatever you need with the event
// Then publish it
subject.onNext(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
}
This way, you can subscribe to this Observable, and every time an event is sent to API
, it is published to all subscribers.
Use like this:
API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
Upvotes: 11