Reputation: 4328
I'm using ReactiveStreams to publish SSE events in Vert.x:
ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
Is there any way to cancel the subscription to the streaming of data? Thanks
Upvotes: 0
Views: 352
Reputation: 9128
The buildRs()
method returns a Publisher
:
Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
When you subscribe to this Publisher
you can keep a reference to the Subscription
and then cancel emission when you're done:
publisher
.subscribe(new Subscriber<String>() {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String s) {
// when no more event is needed
subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// handle error
}
@Override
public void onComplete() {
// handle complete
}
});
Upvotes: 2