Reputation: 49187
I have an existing interface chain I want to run as a reactor instead of managing my own threads and queues
public interface UserLookupService {
public User lookup(String id);
}
public interface UsersHandler {
public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...
// Works well to lookup users in parallel.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(str -> {
userSvc.lookup(str);
});
How can I chain that result so it invokes UsersHandler
with batches of User
?
Upvotes: 2
Views: 1216
Reputation: 14732
Subscribing to something triggers the chain, so you in general can't "chain" subscribers they are the last thing in the chain.
Think if it this way, you set up your reactive pipeline, and when you subscribe
, you trigger the pipeline to start, and the chain will produce a result.
In a webserver, the subscriber
is usually the calling client, and when the client subscribes
he triggers the chain of events in the server that will publish data.
A Flux
is kind of like a list of 1 to n Mono
s. Each object in a Mono/Flux
has a number of "states" so to speak. These are Success
, Error
, Cancel
, Next
, Completed
and more.
When a Mono/Flux
internally goes into a Success
state it will emit the value in it. A Mono
usually goes Success
when something has resolved in the mono.
when you declare Flux.just("userA", "userB", "userC")
you are basically asking the flux to resolve the input you are feeding into it. Placing a string is something that will resolve instantly so flux will go into a Success
state and start emitting the strings as soon as something Subscribes
. So all you have to do is then declare the chain you want to happen after someone Subscribes
.
This can be done in several different ways, when you want to do something and change the value, like you want to from a string
to a user
we usually use map
.
If we just want to do something with each object and not return anything we can use doOnNext
.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.map(userString -> {
return lookupService.lookup(userString);
})
.doOnNext(user -> {
// if you want to do something on each user
// will return void so if you want to log something
// or handle each user
}).subscribe();
Subscribing should be the last thing in the chain.
Upvotes: 4