Reputation: 21
I am exploring Spring Data with r2dbc. I am trying to use a custom repository method in my interface extended from ReactiveCrudRepository, which takes a Flux of Object as Parameter and Return a Flux of the same Object.
@Query("SELECT id, name FROM Person WHERE id = ?")
Flux<Person> myMethod(Flux<Person> person);
I am expecting it to have the same behaviour as the Flux findAllById(Publisher idStream) method.
This is my interface repository definition:
public interface PersonRepository extends ReactiveCrudRepository<Person, Person> {
@Query("SELECT id, name FROM Person WHERE id = ?")
Flux<Person> myMethod(Flux<Person> person);
}
This is my entity:
@Getter
@Setter
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "Person", schema = "mySchema")
public class Person {
@Id
@Column("id")
Long id;
@Column("name")
String name;
}
This my DB Configuration class:
@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
@EnableTransactionManagement
public class DatabaseConfiguration extends AbstractR2dbcConfiguration {
......
}
This is my main application:
@SpringBootApplication
@EnableR2dbcAuditing
@EnableConfigurationProperties({ApplicationProperties.class})
public class MyApplication {
private static final Logger logger = LoggerFactory.getLogger(MyApplication.class);
@Bean
public CommandLineRunner consume(PersonRepository personRepository) {
LongSupplier randomLong = () -> {
return RandomUtils.nextLong(10L,20L);
};
Flux<Long> personIds = Flux.fromStream(LongStream.generate(randomLong).boxed());
Flux<Person> persons = personIds.map( p -> {
Person person = new Person();
person.setId(p);
return person;
});
personRepository.myMethod(persons.take(3)).subscribe( id -> logger.info("Processed Person Id: " + id));
return null;
}
.......
}
When using the interface method findById(Publisher id), it works, but when trying to use the custom method myMethod(Flux id) with quetry annotation I am getting the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Value must not be null
Caused by: java.lang.IllegalArgumentException: **Value must not be null**
at org.springframework.util.Assert.notNull(Assert.java:201)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map(Mono.java:3411)
org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
Error has been observed at the following site(s):
*__________Mono.map ⇢ at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
|_ Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.execute(AbstractR2dbcQuery.java:88)
*____Flux.usingWhen ⇢ at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:242)
Original Stack Trace:
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.r2dbc.core.Parameter.from(Parameter.java:54)
at org.springframework.data.r2dbc.query.QueryMapper.getBindValue(QueryMapper.java:426)
at org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy.getBindValue(DefaultReactiveDataAccessStrategy.java:288)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.getBindValue(ExpressionEvaluatingParameterBinder.java:134)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bindExpressions(ExpressionEvaluatingParameterBinder.java:82)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bind(ExpressionEvaluatingParameterBinder.java:72)
at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.<init>(StringBasedR2dbcQuery.java:197)
at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.lambda$createQuery$0(StringBasedR2dbcQuery.java:156)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:94)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
at reactor.core.publisher.Flux.subscribe(Flux.java:8306)
I couldn't find some similar use case in the Spring reference documenetation, I am wondering if this is even supported.
Upvotes: 1
Views: 5252
Reputation: 733
I checked the @Query
annotation with an incoming and outgoing Flux
, and it appear that the @Query
annotation indeed does not support an incoming Publisher
.
I also got Value must not be null
from my test code below:
Flux<Integer> ids = Flux.range(1, 10);
userInfoRepository.findAllById(ids)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(v1 -> log.info("User {}", v1.getUsername()));
@Query("select * from user.user_infos t where t.id = :id")
Flux<UserInfo> findAllById(Flux<Integer> id);
But if you remove @Query
it actually works:
Flux<UserInfo> findAllById(Flux<Integer> id);
And with other fields too:
Flux<UserInfo> findAllByUsername(Flux<String> username);
To me it seems like the @Query
annotation does not support subscribing to an incoming argument Publisher
at this point (i.e. it does not do .flatMap()
internally).
Upvotes: 0