Reputation: 22267
I want to generate an interleaved stream of two objects types.
I have the two helper methods
Aaa genAaa()
(or Mono<Aaa>
?) to create one Aaa
andFlux<Bbb> genBbbs(Aaa a)
to create a set of Bbb
s from to a
.The overall result should be a Flux<JsonNode>
to accomodate the mixing of the two objects types.
So, the result would be something like
[ {'name':'Aaa-X'},
{'name':'Bbb-x1'},
{'name':'Bbb-x2'},
{'name':'Aaa-Y'},
{'name':'Bbb-y1'},
{'name':'Bbb-y2'},
{'name':'Bbb-y3'}
]
As a rough sketch I tried this:
final ObjectMapper om = new ObjectMapper();
public Flux<JsonNode> create() {
return Flux.range(0, 2) // create 2
.map( idx -> genAaa() ) // bare Aaa's
.flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
.map( om::valueToTree ); // anything to JsonNode
}
But I have several one big issue here:
Because I transform the Aaa
objects (and thus consume them) they are not in the result anymore. I have no idea how I can "use" and keep them in this scenario.
I was thinking if I could pass the "flux in progress" as a parameter to the generate functions, so they each add JsonNodes
as they are created, but that feels wrong (totally not-async) and I wouldn't now how anyway. I suppose there is a concept in Fluxes that just eludes me, still.
Upvotes: 1
Views: 96
Reputation: 8297
You can use Flux#concat
together with genBbbs
method inside the function passed to the flatMap
:
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
the concat
method simply concatenates two sources:
Mono.just
Flux<B>
from the getBbbs(aaa)
invocationExample output:
{"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}
Full listing:
public class Main {
@AllArgsConstructor
@Data
private static class Aaa {
private String name;
}
@AllArgsConstructor
@Data
private static class Bbb {
private String name;
}
private static Mono<Aaa> getAaa(String name) {
return Mono.just(new Aaa(name));
}
private static Flux<Bbb> getBbbs(Aaa aaa) {
return Flux.just(new Bbb("B1-" + aaa.getName()), new Bbb("B2-" + aaa.getName()));
}
public static void main(String[] args) {
combine().subscribe(System.out::println);
}
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
}
Upvotes: 2