Max Grigoriev
Max Grigoriev

Reputation: 1031

Flux.generate with Consumer and parallel

I have simple Flux

Flux<Long> flux = Flux.generate(
            AtomicLong::new,
            (state, sink) -> {
                long i = state.getAndIncrement();
                sink.next(i);
                if (i == 3) sink.complete();
                return state;
            }, (state) -> System.out.println("state: " + state));

Which works as expected in a single thread:

flux.subscribe(System.out::println);

The output is

0 1 2 3 state: 4

But when I switch to parallel:

flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println);

The Consumer which should print state: Number isn't invoked. I just see:

0 3 2 1

Is it a bug or expected feature?

Upvotes: 0

Views: 1624

Answers (1)

Giovanni
Giovanni

Reputation: 4015

I'm not a reactive expert but after digging into the source code it seems that the behavior is by design; it seems that creating a ParallelFlux has the side effect of blocking the call of the State Consumer; if you want to go parallel and getting the State Consumer invoked you can use:

flux.publishOn(Schedulers.elastic()).subscribe(System.out::println);

Upvotes: 1

Related Questions