Lovegiver
Lovegiver

Reputation: 451

How to iterate on a Flux to produce a new Flux and still work in a Reactive way?

I have 2 methods, respectively producing a Flux<String> and a Flux<Integer>.

public Flux<String> buildName() {
        return Flux.just(personProcessor.buildName());
    }

public Flux<Integer> computeAge() {
        return Flux.just(personProcessor.computeAge());
    }

These methods generate randomized values to populate a Person object.

Now I would like to iterate and create X Person with randomized values in each of them.

I have created a new Flux (processor is the name of the service holding the methods) :

Flux<Person> persons = Flux.zip(processor.buildName(), processor.computeAge(),
                (name, age) -> Person.builder().name(name).age(age).build() );

My problem is that when I iterate with a regular loop on the "persons" Flux, I always get Person objects with the same values for name and age. I think it's normal because my loop only subscribes to the Flux and then get the same "first" object of the Flux. I just don't know how to iterate on the Flux "persons" so that I will obtain each time a Person object with its own name and age values.

How can I iterate X times over this Flux in order to get X Person objects with randomized values ?

Of course, this iteration should also produce a Flux that will let me process the Person object.

Upvotes: 0

Views: 3940

Answers (2)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9997

Besides Ikatiforis's most simple solution, you can also do the following which resembles your original solution. This can be useful if the random values are provided in some asynchronous way.

public Flux<String> buildName() {
    return Mono.fromCallable(() -> personProcessor.buildName()).repeat();
}

public Flux<Integer> computeAge() {
    return Mono.fromCallable(() -> personProcessor.computeAge()).repeat();
}

And then the zip part is pretty much the same:

Flux<Person> persons = Flux.zip(processor.buildName(), processor.computeAge(),
        (name, age) -> Person.builder().name(name).age(age).build())
    .take(X);

Upvotes: 0

lkatiforis
lkatiforis

Reputation: 6255

You can simplify it as follows:

Flux.range(1, n)
        .map(i -> Person.builder()
                   .name(personProcessor.buildName())
                   .age(personProcessor.computeAge())
                   .build())

Upvotes: 3

Related Questions