Reputation: 6891
I am using multiprocessing.Pool()
here is what i want to Pool:
def insert_and_process(file_to_process,db):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,file_list,db) # here having problem.
I want to pass 2 arguments What i want to do is to initialize only 4 DB connections (here will try to create connection on every function call so possibly millions of them and cause IO Freezed to death) . if i can create 4 db connections and 1 for each processes it will be ok.
Is there any solution for Pool ? or should i abandon it ?
EDIT:
From help of both of you i got this by doing this:
args=zip(f,cycle(dbs))
Out[-]:
[('f1', 'db1'),
('f2', 'db2'),
('f3', 'db3'),
('f4', 'db4'),
('f5', 'db1'),
('f6', 'db2'),
('f7', 'db3'),
('f8', 'db4'),
('f9', 'db1'),
('f10', 'db2'),
('f11', 'db3'),
('f12', 'db4')]
So here it how it gonna work , i gonna move DB connection code out to the main level and do this:
def process_and_insert(args):
#Table Definations
args[1].table.insert(**parse_file(args[0]))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
args=zip(file_list,cycle(dbs))
P.map(insert_and_process,args) # here having problem.
Yeah , i going to test it out and let you guys know.
Upvotes: 10
Views: 38260
Reputation: 928
You can use
from functools import partial
library for this purpose
like
func = partial(rdc, lat, lng)
r = pool.map(func, range(8))
and
def rdc(lat,lng,x):
pass
Upvotes: 3
Reputation: 110271
The Pool
documentation does not say of a way of passing more than one parameter to the target function - I've tried just passing a sequence, but does not get unfolded (one item of the sequence for each parameter).
However, you can write your target function to expect the first (and only) parameter to be a tuple, in which each element is one of the parameters you are expecting:
from itertools import repeat
def insert_and_process((file_to_process,db)):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,zip(file_list,repeat(db)))
(note the extra parentheses in the definition of insert_and_process
- python treat that as a single parameter that should be a 2-item sequence. The first element of the sequence is attributed to the first variable, and the other to the second)
Upvotes: 29
Reputation: 21
Using
params=[(x,y) for x in X for y in Y]
you create a full copy of x
and y
, and that may be slower than using
from itertools import repeat
P.map(insert_and_process,zip(file_list,repeat(db)))
Upvotes: 2
Reputation: 4622
No need to use zip. If for example you have 2 parameters, x and y, and each of them can get several values, like:
X=range(1,6)
Y=range(10)
The function should get only one parameter, and unpack it inside:
def func(params):
(x,y)=params
...
And you call it like that:
params = [(x,y) for x in X for y in Y]
pool.map(func, params)
Upvotes: 5
Reputation: 67147
Your pool will spawn four processes, each run by it's own instance of the Python interpreter. You can use a global variable to hold your database connection object, so that exactly one connection is created per process:
global_db = None
def insert_and_process(file_to_process, db):
global global_db
if global_db is None:
# If this is the first time this function is called within this
# process, create a new connection. Otherwise, the global variable
# already holds a connection established by a former call.
global_db = DAL("path_to_mysql" + db)
global_db.table.insert(**parse_file(file_to_process))
return True
Since Pool.map()
and friends only support one-argument worker functions, you need to create a wrapper that forwards the work:
def insert_and_process_helper(args):
return insert_and_process(*args)
if __name__ == "__main__":
file_list=os.listdir(".")
db = "wherever you get your db"
# Create argument tuples for each function call:
jobs = [(file, db) for file in file_list]
P = Pool(processes=4)
P.map(insert_and_process_helper, jobs)
Upvotes: 8