Muhammad Sajid Iqbal
Muhammad Sajid Iqbal

Reputation: 93

multiprocessing to a python function

How I can implement the multiprocessing to my function.I tried like this but did not work.

def steric_clashes_parallel(system):
    rna_st = system[MolWithResID("G")].molecule()
    for i in system.molNums():
        peg_st = system[i].molecule()
        if rna_st != peg_st:
            print(peg_st)
            for i in rna_st.atoms(AtomIdx()):
                for j in peg_st.atoms(AtomIdx()):
#                    print(Vector.distance(i.evaluate().center(), j.evaluate().center()))
                    dist = Vector.distance(i.evaluate().center(), j.evaluate().center())
                    if dist<2:
                        return print("there is a steric clash")
    return print("there is no steric clashes")  

mix = PDB().read("clash_1.pdb")
system = System()
system.add(mix)    
from multiprocessing import Pool
p = Pool(4)
p.map(steric_clashes_parallel,system)

I've thousand of pdb or system files to test through this function. It took 2 h for one file on a single core without multiprocessing module. Any suggestion would be great help.

My traceback looks something like this:

self.run()
File "/home/sajid/sire.app/bundled/lib/python3.3/threading.py", line 858,
  in run self._target(*self._args, **self._kwargs)
    File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/pool.py", line 351,
      in _handle_tasks put(task)
        File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/connection.py", line 206,
          in send ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled
(boost.org/libs/python/doc/v2/pickle.html)

Upvotes: 0

Views: 1052

Answers (3)

Dunes
Dunes

Reputation: 40683

There is one trick you can do to get a faster computation for each file -- processing each file sequentially, but processing the contents of the file in parallel. This relies on a number of caveats:

  1. Your are running on a system that can fork processes (such as Linux).
  2. The computations you are doing do not have side effects that effect the result of future computations.

It seems like this is the case in your situation, but I can't be 100% sure.

When a process is forked, all the memory in the child process is duplicated from the parent process (what's more it is duplicated in an efficient manner -- bits of memory that are only read from aren't duplicated). This makes it easy to share big, complex initial states between processes. However, once the child processes have started they will not see any changes to objects made in the parent process though (and vice versa).

Sample code:

import multiprocessing

system = None
rna_st = None

class StericClash(Exception):
    """Exception used to halt processing of a file. Could be modified to 
    include information about what caused the clash if this is useful."""
    pass


def steric_clashes_parallel(system_index):
    peg_st = system[system_index].molecule()
    if rna_st != peg_st:
        for i in rna_st.atoms(AtomIdx()):
            for j in peg_st.atoms(AtomIdx()):
                dist = Vector.distance(i.evaluate().center(), 
                    j.evaluate().center())
                if dist < 2:
                    raise StericClash()


def process_file(filename):
    global system, rna_st

    # initialise global values before creating pool     
    mix = PDB().read(filename)
    system = System()
    system.add(mix)
    rna_st = system[MolWithResID("G")].molecule()

    with multiprocessing.Pool() as pool:
        # contents of file processed in parallel
        try:
            pool.map(steric_clashes_parallel, range(system.molNums()))
        except StericClash:
            # terminate called to halt current jobs and further processing 
            # of file
            pool.terminate()
            # wait for pool processes to terminate before returning
            pool.join()
            return False
        else:
            pool.close()
            pool.join()
            return True
        finally:
            # reset globals
            system = rna_st = None

if __name__ == "__main__":
    for filename in get_files_to_be_processed():
        # files are being processed in serial
        result = process_file(filename)
        save_result_to_disk(filename, result)

Upvotes: 0

Muhammad Sajid Iqbal
Muhammad Sajid Iqbal

Reputation: 93

@ martineau: I tested pickle command and it gave me;

 ----> 1 pickle.dumps(clash_1.pdb)
    RuntimeError: Pickling of "Sire.Mol._Mol.MoleculeGroup" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)
    ----> 1 pickle.dumps(system)
    RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)

With your script it took the same time and using a single core only. dist line is iterable though. Can i run this single line over multicores ? I modify the line as;

for i in rna_st.atoms(AtomIdx()):
                    icent = i.evaluate().center()
                    for j in peg_st.atoms(AtomIdx()):
                        dist = Vector.distance(icent, j.evaluate().center())

Upvotes: 0

tdelaney
tdelaney

Reputation: 77337

The problem is that Sire.System._System.System can't be serialized so it can't be sent to the child process. Multiprocessing uses the pickle module for serialization and you can frequently do a sanity check in the main program with pickle.dumps(my_mp_object) to verify.

You have another problem, though (or I think you do, based on variable names). the map method takes an iterable and fans its iterated objects out to pool members, but it appears that you want to process system itself, not something at it iterates.

One trick to multiprocessing is to keep the payload that you send from the parent to the child simple and let the child do the heavy lifting of creating its objects. Here, you might be better off just sending down filenames and letting the children do most of the work.

def steric_clashes_from_file(filename):
    mix = PDB().read(filename)
    system = System()
    system.add(mix)    
    steric_clashes_parallel(system)

def steric_clashes_parallel(system):
    rna_st = system[MolWithResID("G")].molecule()
    for i in system.molNums():
        peg_st = system[i].molecule()
        if rna_st != peg_st:
            print(peg_st)
            for i in rna_st.atoms(AtomIdx()):
                for j in peg_st.atoms(AtomIdx()):
#                    print(Vector.distance(i.evaluate().center(), j.evaluate().center()))
                    dist = Vector.distance(i.evaluate().center(), j.evaluate().center())
                    if dist<2:
                        return print("there is a steric clash")
    return print("there is no steric clashes")  

filenames = ["clash_1.pdb",]
from multiprocessing import Pool
p = Pool(4, chunksize=1)
p.map(steric_clashes_from_file,filenames)

Upvotes: 3

Related Questions