Reputation: 288
I have this scenario. I have one paginated API which gives me the data for last 12 months. The response of the API is like:
public class PagedTransfersDto {
private List<Transfer> content;
private Page page;
@Getter
public static class Transfer {
private String id;
private Long transferId;
private Long transferRequestId;
private String status;
private BigDecimal accountReceivable;
private BigDecimal accountPayable;
private BigDecimal netReceivable;
private BigDecimal netPayable;
private String currency;
private Long transferDate;
}
@Getter
public static class Page {
private Integer size;
private Integer number;
private Integer totalElements;
private Integer totalPages;
}
}
Now I have to collect all the data and then calculate the sum of all the netReceivable
and return as a Mono<CompanyIncome>
. This pojo is like
public class CompanyIncome {
private BigDecimal inferredIncome = new BigDecimal(0);
}
To do this I have written something like:
CompanyIncome initialIncome = new CompanyIncome();
return myService.getTransfers(0, 50, fromDate, toDate)
.expand(pagedTransfersDto -> {
if (pagedTransfersDto.getPage().getNumber().equals(pagedTransfersDto.getPage().getTotalPages())) {
return Mono.empty();
}
return myService.getTransfers(pagedTransfersDto.getPage().getNumber() + 1, 50, fromDate, toDate);
})
.flatMap(pagedTransfersDto -> Flux.fromIterable(pagedTransfersDto.getContent()))
.reduce(initialIncome, ((companyIncome, transfer) -> {
companyIncome.setInferredIncome(companyIncome.getInferredIncome().add(transfer.getNetReceivable()));
return companyIncome;
}));
Now the catch is that it is possible that this data is only for 3 months in which case I have to extrapolate this to 12 months by multiplying by 4.
What I am thinking is to get the first item of transfers list and the last one and the see if the data is not for a whole year but cant think of a place where to perform this operation.
Since after reduce the transfers data is gone. Before that I cannot seem to find a way how to get this info and still reduce from transfers flux
I am a little new to reactive way and cant seem to find a way to do this. Any help will be greatly appreciated. Thanks
Upvotes: 3
Views: 2277
Reputation: 28351
For that purpose, the best solution is to store the necessary "metadata" in the reduced object. You already have a CompanyIncome
object, so maybe that is a good place? Otherwise I'd introduce either a Tuple2
or some intermediate business object (eg. CompanyIncomeAggregator
) into which to store both the aggregated income and the information that you need to decide at the end if further processing is necessary.
Then in a map
step, you'd read that information, act on it and either return the computed income as is or modified according to your criterion.
Important note: Using variables external to the reactive chain is a code smell, as it introduces leaky shared state: if two subscriptions are made to the same Mono, they'll work on the same CompanyIncome
object. You can remediate here by using reduceWith
, which takes a Supplier
for the initial value: reduceWith(CompanyIncome::new, ...)
.
Upvotes: 1