Reputation: 40520
I'm using Spring Boot 2.0.2 and I'd like to insert the contents of a Flux
into an array in a document in MongoDB using Spring Data its reactive mongodb support using ReactiveMongoOperations. For example (Kotlin code):
val mongo : ReactiveMongoOperations = ...
data class Something(val data: String)
val flux = Flux.just(Something("A"), Something("B"))
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(flux),
"collection")
(note that this is just an example and flux
is not generated using Flux.just(..)
in the actual code).
While this works it doesn't yield the result I want:
{ "_id" : "myId", "myArray" : [ { "array" : [ { "data" : "A" }, { "data" : "B" } ], "_class" : "reactor.core.publisher.FluxArray" } ] }
I'd like the result to be:
{ "_id" : "myId", "myArray" : [ { "data" : "A" }, { "data" : "B" } ] }
i.e. that Spring Data serializes the content (elements) in the Flux
and not the Flux
itself. This example yields the correct result:
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(Something("A"), Something("B")),
"collection")
How can I achieve this without blocking?
Upvotes: 0
Views: 1140
Reputation: 18119
You need to collect the items upfront and $push
a collection of items.
You can't use reactive wrapper types as values of a MongoDB query as they are required to be resolved to values upfront. You can achieve what you want by applying collectList()
and flatMap()
operators:
val flux: Flux<Something>
flux.collectList()
.flatMap { mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(it),
"collection") }
If the number of elements grows beyond a resonable limit, then buffer(n)
might be the more appropriate choice.
Upvotes: 1