dev123
dev123

Reputation: 475

Confusion about threads in Project Reactor's flatMap

I'm playing around with Project Reactor and reactive MongoDB repositories. I have a following code:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}

and main @SpringBootApplication class:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}

The Flux::subscribeOn method description says that:

As such, placing this operator anywhere in the chain will also impact the execution * context of onNext/onError/onComplete signals from the beginning of the chain up to * the next occurrence of a {@link publishOn(Scheduler) publishOn}

which is a little bit confusing to me, because when I don't have any publishOn specified in processing chain, the printed values of threads' names are:

Saving person from thread single-scheduler-1 - as expected

Finding person from thread Thread-13

Finding person from thread Thread-6

Finding person from thread Thread-15

Deleting person from thread Thread-6

Deleting person from thread Thread-5

Deleting person from thread Thread-4

which I don't understand why. Shouldn't the scheduler specified in subscribeOn method be used for each flatMap execution?

When I uncomment publishOn lines, everything is executed by given single scheduler, which is as expected.

Could anyone explain why the single scheduler isn't used by flatMap operations, when there are no publishOn?

Upvotes: 5

Views: 2740

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

This contrived example may make it clearer:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

Which will give something similar to:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

Or, in other words, your reactive repository isn't publishing on the same scheduler, and this is why you see the behaviour you do. "Up until the next occurrence of publishOn()" doesn't mean the next time your code calls publishOn() - it can also be in any of the publishers in any of your flatMap() calls, which you will have no control over.

Upvotes: 5

Related Questions