Halim CHAIBI
Halim CHAIBI

Reputation: 21

Spring Data with r2dbc using Query annotation

I am exploring Spring Data with r2dbc. I am trying to use a custom repository method in my interface extended from ReactiveCrudRepository, which takes a Flux of Object as Parameter and Return a Flux of the same Object.

    @Query("SELECT id, name  FROM Person WHERE id = ?")
    Flux<Person> myMethod(Flux<Person> person);

I am expecting it to have the same behaviour as the Flux findAllById(Publisher idStream) method.

This is my interface repository definition:

public interface PersonRepository extends ReactiveCrudRepository<Person, Person> {

    @Query("SELECT id, name  FROM Person WHERE id = ?")
    Flux<Person> myMethod(Flux<Person> person);

}

This is my entity:

@Getter
@Setter
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "Person", schema = "mySchema")
public class Person {
    @Id
    @Column("id")
    Long id;

    @Column("name")
    String name;

}

This my DB Configuration class:

@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
@EnableTransactionManagement


public class DatabaseConfiguration extends AbstractR2dbcConfiguration {

......

}

This is my main application:

@SpringBootApplication
@EnableR2dbcAuditing
@EnableConfigurationProperties({ApplicationProperties.class})
public class MyApplication {

    private static final Logger logger = LoggerFactory.getLogger(MyApplication.class);


    @Bean
    public CommandLineRunner consume(PersonRepository personRepository) {


        LongSupplier randomLong = () -> {
            return RandomUtils.nextLong(10L,20L);
        };

        Flux<Long> personIds = Flux.fromStream(LongStream.generate(randomLong).boxed());
        Flux<Person> persons = personIds.map( p -> {
            Person person = new Person();
            person.setId(p);
            return person;
        });


        personRepository.myMethod(persons.take(3)).subscribe( id -> logger.info("Processed Person Id: " + id));
        
        return null;
    }   

    .......
}

When using the interface method findById(Publisher id), it works, but when trying to use the custom method myMethod(Flux id) with quetry annotation I am getting the following exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Value must not be null
Caused by: java.lang.IllegalArgumentException: **Value must not be null**
    at org.springframework.util.Assert.notNull(Assert.java:201)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
    reactor.core.publisher.Mono.map(Mono.java:3411)
    org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
Error has been observed at the following site(s):
    *__________Mono.map ⇢ at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.execute(AbstractR2dbcQuery.java:88)
    *____Flux.usingWhen ⇢ at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:242)
Original Stack Trace:
        at org.springframework.util.Assert.notNull(Assert.java:201)
        at org.springframework.r2dbc.core.Parameter.from(Parameter.java:54)
        at org.springframework.data.r2dbc.query.QueryMapper.getBindValue(QueryMapper.java:426)
        at org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy.getBindValue(DefaultReactiveDataAccessStrategy.java:288)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.getBindValue(ExpressionEvaluatingParameterBinder.java:134)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bindExpressions(ExpressionEvaluatingParameterBinder.java:82)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bind(ExpressionEvaluatingParameterBinder.java:72)
        at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.<init>(StringBasedR2dbcQuery.java:197)
        at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.lambda$createQuery$0(StringBasedR2dbcQuery.java:156)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:94)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8306)

I couldn't find some similar use case in the Spring reference documenetation, I am wondering if this is even supported.

Upvotes: 1

Views: 5252

Answers (1)

J011195
J011195

Reputation: 733

I checked the @Query annotation with an incoming and outgoing Flux, and it appear that the @Query annotation indeed does not support an incoming Publisher.

I also got Value must not be null from my test code below:

Flux<Integer> ids = Flux.range(1, 10);

userInfoRepository.findAllById(ids)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(v1 -> log.info("User {}", v1.getUsername()));

@Query("select * from user.user_infos t where t.id = :id")
Flux<UserInfo> findAllById(Flux<Integer> id);

But if you remove @Query it actually works:

Flux<UserInfo> findAllById(Flux<Integer> id);

And with other fields too:

Flux<UserInfo> findAllByUsername(Flux<String> username);

To me it seems like the @Query annotation does not support subscribing to an incoming argument Publisher at this point (i.e. it does not do .flatMap() internally).

Upvotes: 0

Related Questions