paul
paul

Reputation: 13471

Java 9 Flow define subscriber with lambdas

I start playing with Java 9 Flow API, and the first thing that I found and dont like, it´s seems like we cannot use lambdas when we pass a subscriber implementation into the publisher as we can do with RxJava

So I have to define and implement my own Subscriber class

public class CustomSubscriber<T> implements Flow.Subscriber<T> {

        protected Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscription done:");
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

And then just pass it to my publisher

   SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(new CustomSubscriber<>());

This is really verbose, and as I understand it´s because we need to set the subscription in the onSubscribe callback

protected Flow.Subscription subscription;     

To later be used in the onNext to continue the emissions subscription.request(1);

I still dont get it why this mechanism it´s needed, but it´s avoiding the use of Lambdas as we do in RxJava as this example

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
        e -> System.out.println("do something in the onError"),
        () -> System.out.println("Do something in the onComplete"));

I guess this is not possible and I´m not missing nothing here right?

Upvotes: 4

Views: 2102

Answers (2)

Nicolai Parlog
Nicolai Parlog

Reputation: 51060

I still dont get it why this mechanism it´s needed

The subscription enables communication from subscriber to publisher. The request method allows the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". In order to do that, the subscriber needs to hold on to an instance of the subscription and needs to occasionally call request to get more items.

No pressure Subscriber

If you have a use case, where you don't need to apply backpressure and would like to benefit from the reduced complexity, you could implement a LaidBackSubscriber, which:

  • implements onSubscribe by storing the subscription and immediately calling request on it
  • implements onNext by executing a lambda given during construction and then calling subscription.request(1)
  • implements onError and onComplete by executing a lambda given during construction

That should get you what you wanted.

General advice

The Java 9 Flow API was created as an integration point for existing async libraries, not as an invitation to implement reactive components in an ad-hoc fashion. It's great to experiment with, but if you really want to create a reactive system, the existing libraries are likely well-suited.

Upvotes: 4

akarnokd
akarnokd

Reputation: 69997

The Java 9 Flow API is a barebone set of 4 interfaces and 1 bridge class from non-reactive to reactive world. No operators, no convenience lambda versions, nothing else.

In theory, it was introduced to allow the JDK itself to build up internal components based on the reactive principles but there no reassuring signs this is happening.

Thus, the users are responsible to build up components on this API which is difficult, tedious and error-prone. You are better off waiting for the mainstream libraries to release compatible versions or just stick to the more available Reactive-Streams.Org-based libraries such as RxJava 2 and Reactor 3.

If you are still interested in building on top of the Flow API by hand, you can have a look at my research/prototype library Reactive4JavaFlow, which has the desired lambda overload implemented.

Upvotes: 2

Related Questions