Anna-Lischen
Anna-Lischen

Reputation: 878

Multiproceccing + PyMongo lead to [Errno 111]

Good day!

I've just started playing around with pymongo and multiprocessing. I have received a multicore unit for my experiments, which runs Ubuntu 18.04.4 LTS, codename: bionic. Just for the sake of experiment I have tried it both with python 3.8 and python 3.10, unfortunately the results are similar:

>7lvv_E mol:na length:29  DNA (28-MER)
ELSE 7lvv_E
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "LoadDataOnSequence.py", line 54, in createCollectionPDB
    x = newCol.insert_one(dict2Write)
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/collection.py", line 698, in insert_one
    self._insert(document,
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/collection.py", line 613, in _insert
    return self._insert_one(
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/collection.py", line 602, in _insert_one
    self.__database.client._retryable_write(
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1497, in _retryable_write
    with self._tmp_session(session) as s:
  File "/usr/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1829, in _tmp_session
    s = self._ensure_session(session)
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1816, in _ensure_session
    return self.__start_session(True, causal_consistency=False)
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1766, in __start_session
    server_session = self._get_server_session()
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1802, in _get_server_session
    return self._topology.get_server_session()
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/topology.py", line 496, in get_server_session
    self._select_servers_loop(
  File "/home/username/.local/lib/python3.8/site-packages/pymongo/topology.py", line 215, in _select_servers_loop
    raise ServerSelectionTimeoutError(
pymongo.errors.ServerSelectionTimeoutError: 127.0.0.1:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 60db2071e53de99692268c6f, topology_type: Single, servers: [<ServerDescription ('127.0.0.1', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('127.0.0.1:27017: [Errno 111] Connection refused')>]>
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "LoadDataOnSequence.py", line 82, in <module>
    myPool.map(createCollectionPDB, listFile("datum/pdb_seqres.txt"))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
pymongo.errors.ServerSelectionTimeoutError: 127.0.0.1:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 60db2071e53de99692268c6f, topology_type: Single, servers: [<ServerDescription ('127.0.0.1', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('127.0.0.1:27017: [Errno 111] Connection refused')>]>

I have been trying multiple times by modifying my code different ways, no luck though. Also, I have tried both running code from PyCharm via SSH and by creating the local (at multicore machine) folder with all the necessary files.

I count the number of cores and create my MongoClient:

from multiprocessing import *
from pymongo import MongoClient



#Number of cores
x = cpu_count()
print(x)


myClient = MongoClient('mongodb://127.0.0.1:27017/')

I prepare a list to pass, using that function:

def listFile(fileName):
    fOpen = open(fileName)
    listFile = fOpen.readlines()
    arrOfArrs = []
    tmp1 = []
    for i in listFile:
        # print(i)
        if i.startswith(">"):
            if len(tmp1) > 1:
                arrOfArrs.append(tmp1)
            tmp1 = []
            tmp1.append(i.strip())
        else:
            tmp1.append(i.strip())
    #print(listFile)
    return arrOfArrs

That's the way I can prepare a big text file (in reality there's going to be even a larger one, I am just testing using one of the PDB files: https://www.wwpdb.org/ftp/pdb-ftp-sites I use the seqres file, I am not linking the exact file, as it will download immediately). And I suppose everything works till that moment. Next is the function, which will be used in Pool:

def createCollectionPDB(fP):
        lineName = ""
        lineFASTA = ""
        colName = ""
        PDBName = ""
        chainIDName = ""
        typeOfMol = ""
        molLen = ""
        proteinName = ""
        for i in fP:
            print("test", i)
            print(lineName)
            if ">" in i:
                lineName = i.strip()
                print("LINE NAME")
                colName = lineName.split(" ")[0].strip()[1:]
                print("COLNAME", colName)
                PDBName = lineName.split("_")[0].strip()
                chainIDName = colName.split("_")[-1].strip()
                typeOfMol = lineName.split(" ")[1].strip().split(":")[1].strip()
                molLen = lineName.split(" ")[2].strip().split(":")[-1].strip()#[3].split(" ")[0].strip()
                proteinName = lineName.split(" ")[-1].strip()
                print(colName, PDBName, chainIDName, typeOfMol, molLen, proteinName)
            else:
                print("ELSE", colName)
                lineFASTA = i.strip()
                dict2Write={"PDB_ID" : PDBName, "Chain_ID" : chainIDName, "Molecule Type" : typeOfMol, "Length" : molLen, "Protein_Name" : proteinName, "FASTA" : lineFASTA}
                myNewDB = myClient["MyPrjPrj_PDBs"]
                newCol = myNewDB[colName]
                x = newCol.insert_one(dict2Write)
                print("PDB", x.inserted_id)#'''

That one used to work as well. Finally I multiprocess:

f1 = listFile("datum/pdb_seqres.txt")
myPool = Pool(processes=x)
myPool.map(createCollectionPDB, f1)
myPool.join()
myPool.close()

I have been looking through various solutions, like changing the Python version, trying different (5.0 and 4.x) versions of mongo, as well, as restarting mongo. I have also tried changing the number of processes, which leaves me with pretty much the same error, though stopping at a different line. Another option I've tried was using ssh_pymongo, with no luck as well. Also it works without multiprocessing, though w/o multiprocessing I use it on a smaller file.

Upvotes: 0

Views: 168

Answers (1)

D. SM
D. SM

Reputation: 14530

Each process needs to have its own client, therefore you most likely need to create the client in each process instead of creating one prior to invoking multiprocessing.

Forked process: Failure during socket delivery: Broken pipe contains general information on how MongoDB drivers handle forking.

Upvotes: 2

Related Questions