Reputation: 3657
I have a micro service which reads objects from a database using a ReactiveMongoRepository
interface.
The goal is to take each one of those objects and push it to a AWS Lambda function (after converting it to a DTO). If the result of that lambda function is in the 200 range, mark the object as being a success otherwise ignore.
In the old days of a simple Mongo Repository and a RestTemplate this is would be a trivial task. However I'm trying to understand this Reactive deal, and avoid blocking.
Here is the code I've come up with, I know I'm blocking on the webClient
, but how do I avoid that?
@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}
I'm calling the above from a scheduled task, and I don't really care about that actual result of the index() method, just what happens during.
@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}
I've read a bunch of blog posts etc on the subject but they're all just simple CRUD operations without anything happening in the middle so don't really give me a full picture of how to implement these things.
Any help?
Upvotes: 0
Views: 273
Reputation: 1602
my approach, maybe a little bit more readable. But I admit I didn't run it so not 100% guarantee that it will work.
public Flux<Video> index() {
return videoRepository.findAll()
.flatMap(this::callLambda)
.flatMap(videoRepository::save);
}
private Mono<Video> callLambda(final Video video) {
SearchDTO searchDTO = new SearchDTO(video);
return webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO))
.exchange()
.map(ClientResponse::statusCode)
.filter(HttpStatus::is2xxSuccessful)
.map(t -> {
video.setIndexed(true);
return video;
});
}
Upvotes: 0
Reputation: 59086
Your solution is actually quite close. In those cases, you should try and decompose the reactive chain in steps and not hesitate to turn bits into independent methods for clarity.
@Override
public Flux<Video> index() {
Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
return unindexedVideos.flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
Mono<ClientResponse> indexedResponse = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.filter(res -> res.statusCode().is2xxSuccessful());
return indexedResponse.flatMap(response -> {
video.setIndexed(true);
return videoRepository.save(video);
});
});
Upvotes: 1