Reputation: 140427
I came over an article regarding the new Flow
related interfaces in Java9. Example code from there:
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
As you can see, onNext()
requests one new item to be pushed.
Now I am wondering:
onSubscribe()
requested, say 5 items request(1)
is called like aboveIs the server now expected to send
In other words: when request()
is called several times, do those numbers add up; or are previous requests "discarded"?
Leading to the question title - whether the subscriber needs to keep track about received items, in order to avoid requesting "too many" items at some point.
Upvotes: 11
Views: 745
Reputation: 19421
Although Java 9 does not implement the Reactive Streams API, it offers the nearly same API and defines the corresponding behaviour.
And in the Reactive Streams specification this adding up is defined by the following:
For the Publisher in 1.1:
The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
And for the Subscription in 3.8:
While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
So Java adheres to the specification that was given in the Reactive Streams API.
Upvotes: 4
Reputation: 51040
As Sotirios points out, the request
method's Javadoc states (emphasis mine):
Adds the given number
n
of items to the current unfulfilled demand for this subscription. Ifn
is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up ton
additional onNext invocations (or fewer if terminated).
So the answer is clearly yes, the subscriber needs to keep track of items. In fact, that's the whole point of the mechanism. Some background: The request
method is meant to allow the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". It is hence the subscriber's (and only its) task to carefully vet when and how many new items to request. In that line it can not "reconsider" and lower the number of items to receive.
Lowering the number would also make the communication between publisher and subscriber "non-monotonic" in the sense that the number of totally requested items could suddenly lower (as it stands it can only increase). This is not only annoying in an abstract sense, it poses concrete consistency problems: The publisher could be in the process of delivering a few items when the subscriber suddenly drops the number of requested items to 1 - what now?
Upvotes: 7