towi
towi

Reputation: 22267

How to unroll multiple Flux and keep the originals?

I want to generate an interleaved stream of two objects types.

I have the two helper methods

The overall result should be a Flux<JsonNode> to accomodate the mixing of the two objects types.

So, the result would be something like

[ {'name':'Aaa-X'}, 
    {'name':'Bbb-x1'},
    {'name':'Bbb-x2'},
  {'name':'Aaa-Y'},
    {'name':'Bbb-y1'},
    {'name':'Bbb-y2'},
    {'name':'Bbb-y3'}
]

As a rough sketch I tried this:

final ObjectMapper om = new ObjectMapper();

public Flux<JsonNode> create() {
  return Flux.range(0, 2)       // create 2
    .map( idx -> genAaa() )     // bare Aaa's
    .flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
    .map( om::valueToTree );    // anything to JsonNode
}

But I have several one big issue here:

Because I transform the Aaa objects (and thus consume them) they are not in the result anymore. I have no idea how I can "use" and keep them in this scenario.

I was thinking if I could pass the "flux in progress" as a parameter to the generate functions, so they each add JsonNodes as they are created, but that feels wrong (totally not-async) and I wouldn't now how anyway. I suppose there is a concept in Fluxes that just eludes me, still.

Upvotes: 1

Views: 96

Answers (1)

Denis Zavedeev
Denis Zavedeev

Reputation: 8297

You can use Flux#concat together with genBbbs method inside the function passed to the flatMap:

private static Flux<JsonNode> combine() {
    ObjectMapper objectMapper = new ObjectMapper();
    return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
            .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
            .map(objectMapper::valueToTree); // Flux<JsonNode>
}

the concat method simply concatenates two sources:

  1. Artificially created using Mono.just
  2. The Flux<B> from the getBbbs(aaa) invocation

Example output:

{"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}

Full listing:

public class Main {
    @AllArgsConstructor
    @Data
    private static class Aaa {
        private String name;
    }

    @AllArgsConstructor
    @Data
    private static class Bbb {
        private String name;
    }

    private static Mono<Aaa> getAaa(String name) {
        return Mono.just(new Aaa(name));
    }

    private static Flux<Bbb> getBbbs(Aaa aaa) {
        return Flux.just(new Bbb("B1-" + aaa.getName()), new Bbb("B2-" + aaa.getName()));
    }


    public static void main(String[] args) {
        combine().subscribe(System.out::println);
    }

    private static Flux<JsonNode> combine() {
        ObjectMapper objectMapper = new ObjectMapper();
        return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
                .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
                .map(objectMapper::valueToTree); // Flux<JsonNode>
    }
}

Upvotes: 2

Related Questions