Reputation: 489
Trying to build a reactive system with use of akka, rx, vert.x and mongodb
where the flow (downstream) looks pretty much:
publisher -> akka streams -> rx streams -> vert.x event bus -> vert.x mongo rx client -> mongo client
I ran into a situation where the upstream invokes too many update collection actions on mongo, and it ends up with:
com.mongodb.MongoWaitQueueFullException: Too many threads are already waiting for a connection. Max number of threads (maxWaitQueueSize) of 500 has been exceeded.
The updateCollection
:
is triggered concurrently by about 10 publishers
each pushing element every 0.1 second
is performed on the same collection
is performing addition of a new element to an array embedded in a document
1) As this is a reactive system, it would be nice to backpressure the source, to perform a http request only as often, as it makes it possible to write to mongo - so that the queue of threads waiting for connection to mongo wouldn't grow,
Is there any pattern/example, for such a backpressure with mongo, to follow or should I invent and implement it on my own ?
How can I access and observe the number of threads with awaiting connection via the vertx mongo client ?
2) While looking into the code of vertx mongo client it turns out that it's not keeping the connection to mongo open, and the new session is being opened for every update action.
You can observe this in io.vertx.ext.mongo.impl.MongoClientImpl.updateCollection(...)
where underneath it calls MongoCollection.updateOne(...)
without passing the ClientSession clientSession
parameter.
While 10 concurrent updates per second seem a small number, the question is - may it be the case that the creation of ClientSession
is taking a long time and therefore causes the threads to queue ?
Also what would be the design decisions for not caching the connection to mongo in the vertx mongo client ?
The mongostat
is:
insert query update delete getmore command dirty used flushes vsize res qrw arw net_in net_out conn time
*0 *0 32 *0 0 139|0 9.8% 10.8% 0 1.84G 666M 0|0 1|89 29.6k 62.9k 104 Jan 19 02:33:51.980
*0 *0 6 *0 0 4|0 18.7% 18.7% 0 2.90G 1.50G 0|0 1|100 2.41k 9.59k 104 Jan 19 02:33:59.342
*0 *0 *0 *0 0 2|0 15.7% 17.2% 0 3.52G 1.60G 0|0 1|97 493b 7.90k 104 Jan 19 02:34:07.480
*0 *0 9 *0 0 3|0 14.7% 17.2% 0 3.52G 1.57G 0|0 1|100 3.10k 18.7k 104 Jan 19 02:34:10.955
*0 *0 1 *0 0 1|0 21.4% 23.1% 0 3.52G 1.57G 0|0 1|100 749b 7.46k 104 Jan 19 02:34:19.579
*0 *0 10 *0 0 16|0 36.7% 37.4% 0 3.57G 1.57G 0|0 1|100 4.79k 73.7k 104 Jan 19 02:34:20.443
*0 *0 *0 *0 0 9|0 53.6% 54.0% 0 3.62G 1.56G 0|0 1|100 1.33k 47.6k 104 Jan 19 02:34:21.769
*0 *0 1 *0 0 13|0 54.5% 55.2% 0 3.62G 1.57G 0|0 1|100 1.92k 70.6k 104 Jan 19 02:34:22.659
*0 *0 *0 *0 0 23|0 70.5% 70.9% 0 3.62G 1.56G 0|0 1|100 2.75k 122k 104 Jan 19 02:34:23.173
*0 *0 *0 *0 0 31|0 72.1% 72.5% 0 3.62G 1.58G 0|0 1|100 3.56k 153k 104 Jan 19 02:34:23.586
I'd really appreciate your help.
Upvotes: 1
Views: 735
Reputation: 1712
I dont think that you need an backpreasure because if you get more than you process all the time it will allocate all memory and course exception in your case but i think the choose is to create en emitter where every stream emit item to this emitter and use mongorepo.saveAll(emitter) :
processor1 ->
processor2 ->
processor3 ->
processor4 ->
processor5 -> EMITTING TO EMITTER - EMITTER SAVING TO MONGO using saveAll(emmiter)
processor6 ->
processor7 ->
processor8 ->
processor9 ->
processor10 ->
Upvotes: 1