atok
atok

Reputation: 5938

How to create an Observable that will marge a stream of Observables after they complete?

I'm setting up a WebSocket connection from the client side using Jetty WebSocket Client. I'm trying to create a class that will provide a stream of events in the form of Observable.

I managed to get that by writing a POJO @WebSocket class that publishes everything to SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create()); and it all works well.

How to make it reconnect every time the connection breaks?

I tried starting with Observable.interval and flatMap-ing to Observable<ObservableSocket.SocketEvent> connect(String url) that returns Observable per connection.

Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
    try {
        System.out.println("Connect");
        return connect(url);
    } catch (Exception e) {
        System.out.println("Exception: " + e);
        return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
    }
});

The issue is, that it creates another connection every 1 second. How make flatMap wait for inner Observable to finish?

Upvotes: 0

Views: 270

Answers (1)

lopar
lopar

Reputation: 2442

Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });

concatMap maintains a SerialSubscription, and will only subscribe to one emitted observable at a time, waiting for each one to terminate. The range provides the infinite signal (where in this case, infinity ends at about 2 billion :P), and the concatMap will do a one at a time connection based on each inner observable created.

Upvotes: 1

Related Questions