Reputation: 1176
i am currently trying to unsubscribe an observable, if the network request is closed, to not further stream the data.
My Observable is created with:
Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()
So there are multiple subscribers. But I don't know how to unsubscribe if the network is closed.
I am currently stream the data with server-sent event with vert.x to the client like this:
flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
If I cancel the request from the client, the observable will continue to "stream" the data.
thanks for the help
Upvotes: 1
Views: 165
Reputation: 7247
You can unsubscribe subscriber by calling dispose()
Disposable disposable = flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
disposable.dispose();
UPDATED: custom observable
val observable: Observable<LocalDateTime> = Observable.create { emitter ->
val timer = Timer()
timer.schedule(timerTask {
if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
timer.cancel()
} else {
emitter.onNext(LocalDateTime.now())
}
}, 0, 1000)
}
disposable = observable.share().subscribe { t ->
System.out.println(" Hello World! $t");
disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}
Upvotes: 1
Reputation: 4685
You can use takeUntil
operator with response closeHandler
:
router.get("/test").handler(ctx -> {
ctx.response().setChunked(true);
Observable.interval(1, TimeUnit.SECONDS)
.share()
.doFinally(() -> System.out.println("Disposed"))
.takeUntil(Observable.create(e -> ctx.response().closeHandler(event -> e.onComplete())))
.subscribe(l -> ctx.response().write(l.toString()));
});
Upvotes: 0