Kobe Janssens
Kobe Janssens

Reputation: 318

Python - How to make use of MongoDB (pymongo) & multiproccesing without the "MongoClient opened before fork." issue?

I'm making use of multiprocessing but I get this error "MongoClient opened before fork." for every process. I did some research and concluded that I'm now creating multiple MongoClients (one per subprocess). But I didn't find a real solution. Every process is making use of MongoDB connection (I'm using pymongo as connector). Can someone help me?

Code:

def func1():
    while True:
        col1.insert_one({...})
        ...

def func2():
    while True:
        col2.insert_one({...})
        ...

if __name__ == "__main__":
    # MongoDB
    myclient = pymongo.MongoClient("mongodb://localhost:27017/")
    mydb = myclient["testdb"]
    col1 = mydb["col1"]
    col2 = mydb["col2"]

    # Multiproccesing
    p1 = Process(target=func1)
    p2 = Process(target=func2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Upvotes: 1

Views: 625

Answers (1)

AKX
AKX

Reputation: 169075

Have each process open their own MongoDB connection(s).

Heed the warning in get_mongo_client(); if you want something that's safe to call from wherever, you'll need to "tag" _mongo_client with the PID of the current process and discard the object if it has the wrong PID.

_mongo_client = None  # Global per process


def get_mongo_client():
    # Make sure not to call this within the master process, or things
    # will break again.
    global _mongo_client
    if _mongo_client is None:
        _mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
    return _mongo_client


def get_mongo_col(collection, database="testdb"):
    client = get_mongo_client()
    return client[database][collection]


def func1():
    col1 = get_mongo_col("col1")
    while True:
        col1.insert_one({})
        # ...


def func2():
    col2 = get_mongo_col("col2")
    while True:
        col2.insert_one({})
        # ...


def main():
    # Multiproccesing
    p1 = Process(target=func1)
    p2 = Process(target=func2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == "__main__":
    main()

Upvotes: 2

Related Questions