2yan
2yan

Reputation: 307

Function through multiprocessing returns the input in Python

Divide and Conquer algorithim

-- takes a function and a list as it's inputs. returns function(list)

This bit is simple, it gets cooler in that it uses the multi processing module in order to split the list up and then process it all in different bits and return one single list. (This is the entirity of the .py file below just copy all the code blocks into a .py file for python3 and you should see the problem live.)

Got my imports

import multiprocessing as multi
import numpy as np
import pickle
import os

A way to log things ( this doesn't seem to want to work in the process)

def log(text):
    text = str(text)
    with open(text, 'w') as file:
        file.write('Nothing')

Function Wrapper

The goal of this function is to take a function, and deal with the providing it data by pulling it from the disk. Mostly because Pipes just end up with an error that I can not find a solution to.

def __wrap(function):   
    filename = multi.current_process().name + '.tmp'

    with open(filename, 'rb') as file:
        item_list = pickle.load(file)

    result = function(item_list)   

    with open(filename, 'wb') as file:    
   pickle.dump(result, file)

The meat and potatoes

This divides the list into smaller lists for each CPU to gobble down and then starts little processes for each chunk. It saves the input data onto the disk for the __wrap() function to pull up. Finally it pulls up the results that have been written to disk bt the __wrap() function, concatenates them into a single list and returns the value.

def divide_and_conquer(f, things):

    cpu_count = multi.cpu_count()
    chunks = np.array_split(things ,cpu_count )
    cpus = []

    for cpu in range(cpu_count):
        filename = '{}.tmp'.format(cpu)
        with open(filename, 'wb') as file:
            pickle.dump(chunks[cpu], file)
        p = multi.Process(name = str(cpu), target = __wrap, args = (f,))
        p.start()
        cpus.append(p)

    for cpu in cpus:
        cpu.join()

    done = []
    for cpu in cpus:
        filename = '{}.tmp'.format(cpu.name)
        with open(filename, 'rb') as file:
            data = pickle.load(file)
        os.remove(filename)
        done.append(data)

    try:
        done = np.concatenate(done)
    except ValueError:
        pass
    return done

Test Sample

to_do = list(range(10))

def func(thins):
    for thin in thins:
        thin
    return [0, 1, 2,3]

divide_and_conquer(func, to_do)

This just does not have the expected output, it just outputs the input for some reason.

Ultimately my goal with this is to speed up long running computations. I often find myself dealing with lists where each item takes a couple seconds to parse. (web scraping etc) I pretty much just want to add this tool to my little 'often used code snippets library' so I can just import and go

"rt.divide_and_conquer(tough_function, really_long_list)"

and see an easy 8 fold improvement. I'm currently seeing issues with this working on windows (haven't gotten around to testing it on my linux box yet) and my reading around has shown me that apparently Linux and Windows handle multiprocessing differently.

Upvotes: 0

Views: 46

Answers (1)

Max Mikhaylov
Max Mikhaylov

Reputation: 792

You don't need to reinvent the wheel. If I understand what you are trying to achieve correctly, then concurrent.futures module is what you need.

ProcessPoolExecutor does the job of splitting a list, launching multiple processes (using maximum number of available threads with default settings) and applying a function to each element in those lists.

Upvotes: 1

Related Questions