Auryn
Auryn

Reputation: 1176

RxJava unsubscribe on Network close

i am currently trying to unsubscribe an observable, if the network request is closed, to not further stream the data.

My Observable is created with:

Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()

So there are multiple subscribers. But I don't know how to unsubscribe if the network is closed.

I am currently stream the data with server-sent event with vert.x to the client like this:

flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

If I cancel the request from the client, the observable will continue to "stream" the data.

thanks for the help

Upvotes: 1

Views: 165

Answers (2)

Alexey
Alexey

Reputation: 7247

You can unsubscribe subscriber by calling dispose()

Disposable disposable = flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

disposable.dispose();

UPDATED: custom observable

val observable: Observable<LocalDateTime> = Observable.create { emitter ->
    val timer = Timer()
    timer.schedule(timerTask {
        if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
            timer.cancel()
        } else {
            emitter.onNext(LocalDateTime.now())
        }
    }, 0, 1000)

}
disposable = observable.share().subscribe { t ->
    System.out.println(" Hello World! $t");
    disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}

Upvotes: 1

zella
zella

Reputation: 4685

You can use takeUntil operator with response closeHandler:

router.get("/test").handler(ctx -> {
    ctx.response().setChunked(true);
    Observable.interval(1, TimeUnit.SECONDS)
            .share()
            .doFinally(() -> System.out.println("Disposed"))
            .takeUntil(Observable.create(e -> ctx.response().closeHandler(event -> e.onComplete())))
            .subscribe(l -> ctx.response().write(l.toString()));
});

Upvotes: 0

Related Questions