Francesco Marchioni
Francesco Marchioni

Reputation: 4328

How to cancel the subscription to a Reactive Stream in Vert.x


I'm using ReactiveStreams to publish SSE events in Vert.x:

ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
        .map(l -> String.format("Number of Customer added %s .%n",
                customerRepository.findAll().size() + " "))
        .buildRs();

Is there any way to cancel the subscription to the streaming of data? Thanks

Upvotes: 0

Views: 352

Answers (1)

tsegismont
tsegismont

Reputation: 9128

The buildRs() method returns a Publisher:

Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
  .map(l -> String.format("Number of Customer added %s .%n",
    customerRepository.findAll().size() + " "))
  .buildRs();

When you subscribe to this Publisher you can keep a reference to the Subscription and then cancel emission when you're done:

publisher
  .subscribe(new Subscriber<String>() {
    volatile Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
      this.subscription = subscription;
    }

    @Override
    public void onNext(String s) {
      // when no more event is needed
      subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
      // handle error
    }

    @Override
    public void onComplete() {
      // handle complete
    }
  });

Upvotes: 2

Related Questions