Paul Nibin
Paul Nibin

Reputation: 726

Reactive Pull-Based BackPressure using Reactive Streams

This is my understanding of the subject.

There is Publisher and subscriber.

The pseudo code for Publisher and Subscriber is something like,

Publisher{
    Subscriber s;
    subscribe(Subscriber s){
        this.s = s;
        s.onSubscribe(new Subscription(){
            onRequest(int n){
                List<Message> messages = service.getMessages(n);
                s.onNext(messages);
            }

            onCancel(){
                s.onComplete();
            }
        });
    }
}

Subscriber{
    Subscription s;
    onSubscribe(Subscription s){
        this.s = s;
        s.request(5);
    }

    onNext(List<Message> messages){
        messages.stream().parallel().map(this::process).collect(toList());
        s.request(5);
    }

    onComplete(){}

    onError(e){}

    private boolean process(Message m){
        //process message and return true/false according to whether it passed/failed.
    }
}

I understood like, according to the ability of the application, the subscriber will call the request. When the application is healthy, subscriber can process fast and request messages more time. If the application is under load, subscriber will request the next batch only after processing the current batch. If processing takes time, lesser no of requests for more messages. The flow of the messages will be according to the capability of the application.

Is my understanding correct?

How is it different from a simple loop?

while(true){
    List<Message> messages = service.getMessages(5);
    messages.stream().parallel().map(this:process).collect(toList());
}

In this case also, the next batch of messages are read only after processing the messages concurrently. In this case also, when the application is performing well, more messages will be read. If slow messages will be read less frequently.

How are these 2 approaches different? Is the differences all about different types of Schedulers that are available? I do not understand, what exactly is the advantage here.

Update 1

Ok, I understand some advantages of the reactive pull based approach over simple loop.

If a subscriber requests n items, is it necessary for the Publisher to call onNext() n times on the subscriber? Or would it also be fine, if the Publisher calls the Subscriber once with a list of n elements (Like in the previous snippet)? If n onNext() calls needs to be made, the subscriber is becoming little more complex.

Publisher{
    Subscriber s;
    subscribe(Subscriber s){
        this.s = s;
        s.onSubscribe(new Subscription(){
            request(int n){
                service.getMessagesAsyc(n, (List<Message> messages) -> messages.stream().forEach(s::onNext));
            }

            onCancel(){
                s.onComplete();
            }
        });
    }
}

Subscriber{
    Subscription s;
    COUNT = 5;
    volatile int i = COUNT;

    onSubscribe(Subscription s){
        this.s = s;
        s.request(COUNT);
    }

    onNext(Message message){
        CompletableFuture.runAsync(() -> process(message));
        requestMessagesIfNeeded();
    }

    private synchronized requestmessagesIfNeeded(){
        if(0 == i--){
            i = COUNT;
            s.request(COUNT);
        }
    }

    private boolean process(Message m){
        //process message and return true/false according to whether it passed/failed.
    }
}

If the subscriber can be passed a list of n messages, there are also few other advantages. Suppose, the Subscriber needs to acknowledge only the successfully processed messages, it is very easy to do that in the first approach using a batch acknowledge API.

    Map<Boolean, List<Message>> partitioned = messages.stream().parallel().collect(partitioningBy(this::process));
service.ackowledge(partitioned.get(true));
s.request(5);   

The second approach, where I get one message each on onNext(), it looks much more difficult to achieve it.

Upvotes: 2

Views: 780

Answers (1)

akarnokd
akarnokd

Reputation: 69997

onRequest(int n){
    List<Message> messages = service.getMessages(n);
    s.onNext(messages);
}

This is an incorrect view on Reactive Streams. The request tells the Publisher it can do n onNext() calls. Often, that means the Subscription implementation representing an active connection between a source and consumer will handle the request call.

How is it different from a simple loop?

Reactive Streams allows non-blocking consumption; your example blocks a thread until getMessages() could retreive the List of messages. The nice property of working with Publishers is that you don't have to consume events differently whether or not the source Publisher is blocking or non-blocking. It gives an uniform programming model for both cases. If one day getMessages() receives a non-blocking variant, the downstream consuming its wrapper Publisher doesn't have to be changed.

Is the differences all about different types of Schedulers that are available?

A Scheduler represents an abstraction over an asynchronous boundary: the thread that is producing events and the thread that is consuming those events. The different Scheduler implementations manage their threads in a different manner, depending what the utilization of those threads are going to be. Some threads will execute computationally intensive tasks, some threads will block due to interactions with non-reactive sources. The Schedulers class gives descriptions what the various standard implementations are for.

Upvotes: 1

Related Questions