Clyde
Clyde

Reputation: 171

Using mongoengine with multiprocessing - how do you close mongoengine connections?

No matter what I try I keep hitting the "MongoClient opened before fork" warning regarding not forking active mongo connections when trying to use multiprocessing on a mongoengine db. The standard mongo advice seems to be to only connect to the db from within the child processes but I think what I'm doing should be functionally equivalent because I'm closing the database prior to using multiprocessing however I still hit the problem.

Related questions either without a minimal example or with inapplicable solutions are here, here, and specifically for the case of flask/celery and here

Minimal example to reproduce the problem:

from mongoengine import connect, Document, StringField, ListField, ReferenceField
from pathos.multiprocessing import ProcessingPool


class Base(Document):
    key = StringField(primary_key=True)
    name = StringField()
    parent = ReferenceField('Parent', required=True)

class Parent(Document):
    key = StringField(primary_key=True)
    name = StringField()
    bases = ListField(ReferenceField('Base'))


def remove_base(key):
   db = connect('mydb')
   mongo_b = Base.objects().get(key=key)
   mongo_b.parent.update(pull__bases=mongo_b)
   mongo_b.delete()


### setup
db = connect('mydb', connect=False)

Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()

p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')

### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects

# close db to avoid problems?!
db.close()
del db

# parallel map removing base objects and references from the db
# warning generated here
pp = ProcessingPool(2)
pp.map(remove_base, keys)

Upvotes: 4

Views: 2243

Answers (2)

bagerard
bagerard

Reputation: 6374

This got recently improved and as of MongoEngine>=0.18.0, the method disconnect() and disconnect_all() should be used to respectively disconnect 1 or all existing connections (changelog 0.18.0)

See official doc

Upvotes: 1

Clyde
Clyde

Reputation: 171

Ok so I figured it out. Mongoengine caches connections to the database all over the place. If you manually remove them then the issue is resolved. Adding the following import

from mongoengine import connection

then adding in:

connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}

Base._collection = None
Parent._collection = None

to the '#close db' section appears to solve the issue.

Complete code:

from mongoengine import connect, Document, StringField, ListField, ReferenceField, connection
from pathos.multiprocessing import ProcessingPool


class Base(Document):
    key = StringField(primary_key=True)
    name = StringField()
    parent = ReferenceField('Parent', required=True)

class Parent(Document):
    key = StringField(primary_key=True)
    name = StringField()
    bases = ListField(ReferenceField('Base'))


def remove_base(key):
   db = connect('mydb', connect=False)
   mongo_b = Base.objects().get(key=key)
   mongo_b.parent.update(pull__bases=mongo_b)
   mongo_b.delete()

def setup():
    Base(key='b1', name='test', parent='p1').save()
    Base(key='b2', name='test', parent='p1').save()
    Base(key='b3', name='test2', parent='p1').save()

    p=Parent(key='p1', name='parent').save()
    p.update(add_to_set__bases='b1')
    p.update(add_to_set__bases='b2')
    p.update(add_to_set__bases='b3')

db = connect('mydb', connect=False)
setup()
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects


### close db to avoid problems?!
db.close()
db = None

connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}

Base._collection = None
Parent._collection = None

### parallel map removing base objects from the db
pp = ProcessingPool(2)
pp.map(remove_base, keys)

Upvotes: 2

Related Questions