Johan Sjöberg
Johan Sjöberg

Reputation: 49187

How to chain reactor subscribers

I have an existing interface chain I want to run as a reactor instead of managing my own threads and queues

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

How can I chain that result so it invokes UsersHandler with batches of User ?

Upvotes: 2

Views: 1216

Answers (1)

Toerktumlare
Toerktumlare

Reputation: 14732

Subscribing to something triggers the chain, so you in general can't "chain" subscribers they are the last thing in the chain.

Think if it this way, you set up your reactive pipeline, and when you subscribe, you trigger the pipeline to start, and the chain will produce a result.

In a webserver, the subscriber is usually the calling client, and when the client subscribes he triggers the chain of events in the server that will publish data.

A Flux is kind of like a list of 1 to n Monos. Each object in a Mono/Flux has a number of "states" so to speak. These are Success, Error, Cancel, Next, Completed and more.

When a Mono/Flux internally goes into a Success state it will emit the value in it. A Mono usually goes Success when something has resolved in the mono.

when you declare Flux.just("userA", "userB", "userC") you are basically asking the flux to resolve the input you are feeding into it. Placing a string is something that will resolve instantly so flux will go into a Success state and start emitting the strings as soon as something Subscribes. So all you have to do is then declare the chain you want to happen after someone Subscribes.

This can be done in several different ways, when you want to do something and change the value, like you want to from a string to a user we usually use map.

If we just want to do something with each object and not return anything we can use doOnNext.

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

Subscribing should be the last thing in the chain.

Upvotes: 4

Related Questions