Reputation: 307
-- takes a function and a list as it's inputs. returns function(list)
This bit is simple, it gets cooler in that it uses the multi processing module in order to split the list up and then process it all in different bits and return one single list. (This is the entirity of the .py file below just copy all the code blocks into a .py file for python3 and you should see the problem live.)
import multiprocessing as multi
import numpy as np
import pickle
import os
def log(text):
text = str(text)
with open(text, 'w') as file:
file.write('Nothing')
The goal of this function is to take a function, and deal with the providing it data by pulling it from the disk. Mostly because Pipes just end up with an error that I can not find a solution to.
def __wrap(function):
filename = multi.current_process().name + '.tmp'
with open(filename, 'rb') as file:
item_list = pickle.load(file)
result = function(item_list)
with open(filename, 'wb') as file:
pickle.dump(result, file)
This divides the list into smaller lists for each CPU to gobble down and then starts little processes for each chunk. It saves the input data onto the disk for the __wrap() function to pull up. Finally it pulls up the results that have been written to disk bt the __wrap() function, concatenates them into a single list and returns the value.
def divide_and_conquer(f, things):
cpu_count = multi.cpu_count()
chunks = np.array_split(things ,cpu_count )
cpus = []
for cpu in range(cpu_count):
filename = '{}.tmp'.format(cpu)
with open(filename, 'wb') as file:
pickle.dump(chunks[cpu], file)
p = multi.Process(name = str(cpu), target = __wrap, args = (f,))
p.start()
cpus.append(p)
for cpu in cpus:
cpu.join()
done = []
for cpu in cpus:
filename = '{}.tmp'.format(cpu.name)
with open(filename, 'rb') as file:
data = pickle.load(file)
os.remove(filename)
done.append(data)
try:
done = np.concatenate(done)
except ValueError:
pass
return done
to_do = list(range(10))
def func(thins):
for thin in thins:
thin
return [0, 1, 2,3]
divide_and_conquer(func, to_do)
This just does not have the expected output, it just outputs the input for some reason.
Ultimately my goal with this is to speed up long running computations. I often find myself dealing with lists where each item takes a couple seconds to parse. (web scraping etc) I pretty much just want to add this tool to my little 'often used code snippets library' so I can just import and go
"rt.divide_and_conquer(tough_function, really_long_list)"
and see an easy 8 fold improvement. I'm currently seeing issues with this working on windows (haven't gotten around to testing it on my linux box yet) and my reading around has shown me that apparently Linux and Windows handle multiprocessing differently.
Upvotes: 0
Views: 46
Reputation: 792
You don't need to reinvent the wheel. If I understand what you are trying to achieve correctly, then concurrent.futures module is what you need.
ProcessPoolExecutor does the job of splitting a list, launching multiple processes (using maximum number of available threads with default settings) and applying a function to each element in those lists.
Upvotes: 1