Reputation: 5938
I'm setting up a WebSocket connection from the client side using Jetty WebSocket Client. I'm trying to create a class that will provide a stream of events in the form of Observable.
I managed to get that by writing a POJO @WebSocket
class that publishes everything to SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create());
and it all works well.
How to make it reconnect every time the connection breaks?
I tried starting with Observable.interval
and flatMap
-ing to Observable<ObservableSocket.SocketEvent> connect(String url)
that returns Observable per connection.
Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
try {
System.out.println("Connect");
return connect(url);
} catch (Exception e) {
System.out.println("Exception: " + e);
return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
}
});
The issue is, that it creates another connection every 1 second. How make flatMap wait for inner Observable to finish?
Upvotes: 0
Views: 270
Reputation: 2442
Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });
concatMap maintains a SerialSubscription, and will only subscribe to one emitted observable at a time, waiting for each one to terminate. The range provides the infinite signal (where in this case, infinity ends at about 2 billion :P), and the concatMap will do a one at a time connection based on each inner observable created.
Upvotes: 1