GhostCat
GhostCat

Reputation: 140427

Flow programming: subscriber and publisher to keep track of count?

I came over an article regarding the new Flow related interfaces in Java9. Example code from there:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;    
  @Override  
  public void onSubscribe(Subscription subscription) {  
    this.subscription = subscription;  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  }        
  @Override  
  public void onNext(T item) {  
    System.out.println("Got : " + item);  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
   }  

As you can see, onNext() requests one new item to be pushed.

Now I am wondering:

Is the server now expected to send

In other words: when request() is called several times, do those numbers add up; or are previous requests "discarded"?

Leading to the question title - whether the subscriber needs to keep track about received items, in order to avoid requesting "too many" items at some point.

Upvotes: 11

Views: 745

Answers (2)

Abacus
Abacus

Reputation: 19421

Although Java 9 does not implement the Reactive Streams API, it offers the nearly same API and defines the corresponding behaviour.

And in the Reactive Streams specification this adding up is defined by the following:

For the Publisher in 1.1:

The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

And for the Subscription in 3.8:

While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.

So Java adheres to the specification that was given in the Reactive Streams API.

Upvotes: 4

Nicolai Parlog
Nicolai Parlog

Reputation: 51040

As Sotirios points out, the request method's Javadoc states (emphasis mine):

Adds the given number n of items to the current unfulfilled demand for this subscription. If n is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n additional onNext invocations (or fewer if terminated).

So the answer is clearly yes, the subscriber needs to keep track of items. In fact, that's the whole point of the mechanism. Some background: The request method is meant to allow the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". It is hence the subscriber's (and only its) task to carefully vet when and how many new items to request. In that line it can not "reconsider" and lower the number of items to receive.

Lowering the number would also make the communication between publisher and subscriber "non-monotonic" in the sense that the number of totally requested items could suddenly lower (as it stands it can only increase). This is not only annoying in an abstract sense, it poses concrete consistency problems: The publisher could be in the process of delivering a few items when the subscriber suddenly drops the number of requested items to 1 - what now?

Upvotes: 7

Related Questions