scorpiodawg
scorpiodawg

Reputation: 5752

In RxJava, how do I start a potentially infinite stream of events generated from an API?

I have an API available to clients that can be simplified to this:

public class API {
  public void sendEvent(Event e);
}

Event instances enter my system whenever a client calls the API (technically over Binder into a Service derivative) which are then processed, filtered and dispatched to other internal components. I don't care about past events, just those available from the time a subscriber subscribes. It seems like a natural fit for the Rx paradigm which I'm just getting my feet wet with.

I need an Observable that is created once, allows multiple subscribers, and can be fed instances of Event that are then sent through the reactive pipeline to observers. A Subject seems appropriate for what I'm looking to do (in particular, this answer to this question resonated with me).

What do other RxJava users recommend?

Upvotes: 5

Views: 4410

Answers (2)

Dave Moten
Dave Moten

Reputation: 12087

Try observable.share() which under the covers calls .publish().refCount(). It will use only one underlying subscription and give you the multiple subscription behaviour you specified.

Upvotes: 1

njzk2
njzk2

Reputation: 39386

For example, following on my short comment:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

Then you probably have an instance somewhere: API api = ...

Your Observable is obtained like so: Observable.create(api); You can then do any normal thing you would do with an Observable.

The filtering of the unsubscribed Subscribers is left as an exercise to the reader.

Edit

A little more research shows that PublishSubject should help:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

This way, you can subscribe to this Observable, and every time an event is sent to API, it is published to all subscribers.

Use like this:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));

Upvotes: 11

Related Questions