user1913596
user1913596

Reputation: 489

Vertx mongo client overwhelming mongodb with concurrent updates

Trying to build a reactive system with use of akka, rx, vert.x and mongodb where the flow (downstream) looks pretty much:

publisher -> akka streams -> rx streams -> vert.x event bus -> vert.x mongo rx client -> mongo client

I ran into a situation where the upstream invokes too many update collection actions on mongo, and it ends up with:

com.mongodb.MongoWaitQueueFullException: Too many threads are already waiting for a connection. Max number of threads (maxWaitQueueSize) of 500 has been exceeded.

The updateCollection:

1) As this is a reactive system, it would be nice to backpressure the source, to perform a http request only as often, as it makes it possible to write to mongo - so that the queue of threads waiting for connection to mongo wouldn't grow,

Is there any pattern/example, for such a backpressure with mongo, to follow or should I invent and implement it on my own ?

How can I access and observe the number of threads with awaiting connection via the vertx mongo client ?

2) While looking into the code of vertx mongo client it turns out that it's not keeping the connection to mongo open, and the new session is being opened for every update action. You can observe this in io.vertx.ext.mongo.impl.MongoClientImpl.updateCollection(...) where underneath it calls MongoCollection.updateOne(...) without passing the ClientSession clientSession parameter.

While 10 concurrent updates per second seem a small number, the question is - may it be the case that the creation of ClientSession is taking a long time and therefore causes the threads to queue ?

Also what would be the design decisions for not caching the connection to mongo in the vertx mongo client ?

The mongostat is:

insert query update delete getmore command dirty  used flushes vsize   res qrw  arw net_in net_out conn                time
    *0    *0     32     *0       0   139|0  9.8% 10.8%       0 1.84G  666M 0|0 1|89  29.6k   62.9k  104 Jan 19 02:33:51.980
    *0    *0      6     *0       0     4|0 18.7% 18.7%       0 2.90G 1.50G 0|0 1|100  2.41k   9.59k  104 Jan 19 02:33:59.342
    *0    *0     *0     *0       0     2|0 15.7% 17.2%       0 3.52G 1.60G 0|0  1|97   493b   7.90k  104 Jan 19 02:34:07.480
    *0    *0      9     *0       0     3|0 14.7% 17.2%       0 3.52G 1.57G 0|0 1|100  3.10k   18.7k  104 Jan 19 02:34:10.955
    *0    *0      1     *0       0     1|0 21.4% 23.1%       0 3.52G 1.57G 0|0 1|100   749b   7.46k  104 Jan 19 02:34:19.579
    *0    *0     10     *0       0    16|0 36.7% 37.4%       0 3.57G 1.57G 0|0 1|100  4.79k   73.7k  104 Jan 19 02:34:20.443
    *0    *0     *0     *0       0     9|0 53.6% 54.0%       0 3.62G 1.56G 0|0 1|100  1.33k   47.6k  104 Jan 19 02:34:21.769
    *0    *0      1     *0       0    13|0 54.5% 55.2%       0 3.62G 1.57G 0|0 1|100  1.92k   70.6k  104 Jan 19 02:34:22.659
    *0    *0     *0     *0       0    23|0 70.5% 70.9%       0 3.62G 1.56G 0|0 1|100  2.75k    122k  104 Jan 19 02:34:23.173
    *0    *0     *0     *0       0    31|0 72.1% 72.5%       0 3.62G 1.58G 0|0 1|100  3.56k    153k  104 Jan 19 02:34:23.586

I'd really appreciate your help.

Upvotes: 1

Views: 735

Answers (1)

Ricard Kollcaku
Ricard Kollcaku

Reputation: 1712

I dont think that you need an backpreasure because if you get more than you process all the time it will allocate all memory and course exception in your case but i think the choose is to create en emitter where every stream emit item to this emitter and use mongorepo.saveAll(emitter) :

processor1 ->

processor2 ->

processor3 ->

processor4 ->

processor5 -> EMITTING TO EMITTER - EMITTER SAVING TO MONGO using saveAll(emmiter)

processor6 ->

processor7 ->

processor8 ->

processor9 ->

processor10 ->

Upvotes: 1

Related Questions