Reputation: 475
I'm playing around with Project Reactor and reactive MongoDB repositories. I have a following code:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
@Id
Integer id;
String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
and main @SpringBootApplication
class:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args) {
SpringApplication.run(ReactiveDatabaseApplication.class, args);
}
@PostConstruct
public void postConstruct() {
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
})
//.publishOn(single)
.flatMap(savedPerson -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
})
//.publishOn(single)
.flatMap(foundPerson -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
})
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
}
}
The Flux::subscribeOn
method description says that:
As such, placing this operator anywhere in the chain will also impact the execution * context of onNext/onError/onComplete signals from the beginning of the chain up to * the next occurrence of a {@link publishOn(Scheduler) publishOn}
which is a little bit confusing to me, because when I don't have any publishOn
specified in processing chain, the printed values of threads' names are:
Saving person from thread single-scheduler-1 - as expected
Finding person from thread Thread-13
Finding person from thread Thread-6
Finding person from thread Thread-15
Deleting person from thread Thread-6
Deleting person from thread Thread-5
Deleting person from thread Thread-4
which I don't understand why. Shouldn't the scheduler specified in subscribeOn
method be used for each flatMap
execution?
When I uncomment publishOn
lines, everything is executed by given single scheduler, which is as expected.
Could anyone explain why the single scheduler isn't used by flatMap
operations, when there are no publishOn
?
Upvotes: 5
Views: 2740
Reputation: 72254
This contrived example may make it clearer:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
Which will give something similar to:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
Or, in other words, your reactive repository isn't publishing on the same scheduler, and this is why you see the behaviour you do. "Up until the next occurrence of publishOn()
" doesn't mean the next time your code calls publishOn()
- it can also be in any of the publishers in any of your flatMap()
calls, which you will have no control over.
Upvotes: 5