sh1nen
sh1nen

Reputation: 199

Reduce stream of Flux to Flux

I am having some problems with applying to reduce operation on Stream<Flux<T>>, I would like to reduce it to Flux<T>. Each AdProvider provides offers as Flux, I would like to use the stream to get all offers from each one of them and concatenate them to one pipeline. How could I possibly do that with reduce?

Set<AdProvider> adProviders;

@Override
@LogBefore
public void gather()
{
    adProviders
        .parallelStream()
        .map(this::gatherOffers)
        .reduce(?)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

Upvotes: 0

Views: 4555

Answers (1)

Oleh Dokuka
Oleh Dokuka

Reputation: 12184

Flatten Stream<Flux> using Flux#fromStream() + Flux#flatMap()

In order to solve the problem, you may combine Flux#fromStream() (which convert Stream<Flux> to Flux<Flux>) and Flux#flatMap() (which flatten inner fluxes to flat Flux) as in the following example:

Set<AdProvider> adProviders;

@Override
public void gather()
{
    Flux.fromStream(adProviders.stream())
        .parallel() // replace .parallelStream with separate parallel + runOn
        .runOn(Schedulers.parallel())
        .flatMap(this::gatherOffers)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

As it might be noticed, I replaced parallelStream with plain .stream and parallel + runOn which do almost the same.

Alternatively, you may avoid using stream at all and simply rely on Flux.fromIterble + the same Flux#flatMap:

Set<AdProvider> adProviders;

@Override
public void gather()
{
    Flux.fromIterable(adProviders)
        .parallel() // replace .parallelStream with separate parallel + runOn
        .runOn(Schedulers.parallel())
        .flatMap(this::gatherOffers)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

Upvotes: 1

Related Questions