Reputation: 13471
I start playing with Java 9 Flow API, and the first thing that I found and dont like, it´s seems like we cannot use lambdas when we pass a subscriber implementation into the publisher as we can do with RxJava
So I have to define and implement my own Subscriber class
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
And then just pass it to my publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
This is really verbose, and as I understand it´s because we need to set the subscription in the onSubscribe
callback
protected Flow.Subscription subscription;
To later be used in the onNext
to continue the emissions subscription.request(1);
I still dont get it why this mechanism it´s needed, but it´s avoiding the use of Lambdas as we do in RxJava as this example
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
I guess this is not possible and I´m not missing nothing here right?
Upvotes: 4
Views: 2102
Reputation: 51060
I still dont get it why this mechanism it´s needed
The subscription enables communication from subscriber to publisher. The request
method allows the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". In order to do that, the subscriber needs to hold on to an instance of the subscription and needs to occasionally call request
to get more items.
Subscriber
If you have a use case, where you don't need to apply backpressure and would like to benefit from the reduced complexity, you could implement a LaidBackSubscriber
, which:
onSubscribe
by storing the subscription and immediately calling request
on itonNext
by executing a lambda given during construction and then calling subscription.request(1)
onError
and onComplete
by executing a lambda given during constructionThat should get you what you wanted.
The Java 9 Flow API was created as an integration point for existing async libraries, not as an invitation to implement reactive components in an ad-hoc fashion. It's great to experiment with, but if you really want to create a reactive system, the existing libraries are likely well-suited.
Upvotes: 4
Reputation: 69997
The Java 9 Flow API is a barebone set of 4 interfaces and 1 bridge class from non-reactive to reactive world. No operators, no convenience lambda versions, nothing else.
In theory, it was introduced to allow the JDK itself to build up internal components based on the reactive principles but there no reassuring signs this is happening.
Thus, the users are responsible to build up components on this API which is difficult, tedious and error-prone. You are better off waiting for the mainstream libraries to release compatible versions or just stick to the more available Reactive-Streams.Org-based libraries such as RxJava 2 and Reactor 3.
If you are still interested in building on top of the Flow API by hand, you can have a look at my research/prototype library Reactive4JavaFlow, which has the desired lambda overload implemented.
Upvotes: 2