Reputation: 199
I am having some problems with applying to reduce operation on Stream<Flux<T>>
, I would like to reduce it to Flux<T>
. Each AdProvider provides offers as Flux, I would like to use the stream to get all offers from each one of them and concatenate them to one pipeline. How could I possibly do that with reduce?
Set<AdProvider> adProviders;
@Override
@LogBefore
public void gather()
{
adProviders
.parallelStream()
.map(this::gatherOffers)
.reduce(?)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
Upvotes: 0
Views: 4555
Reputation: 12184
Stream<Flux>
using Flux#fromStream()
+ Flux#flatMap()
In order to solve the problem, you may combine Flux#fromStream()
(which convert Stream<Flux>
to Flux<Flux>
) and Flux#flatMap()
(which flatten inner fluxes to flat Flux
) as in the following example:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromStream(adProviders.stream())
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
As it might be noticed, I replaced parallelStream
with plain .stream
and parallel
+ runOn
which do almost the same.
Alternatively, you may avoid using stream at all and simply rely on Flux.fromIterble
+ the same Flux#flatMap
:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromIterable(adProviders)
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
Upvotes: 1