Reputation: 3048
I have some code like the following:
import multiprocessing as mp
connection: module.Connection
def client_id():
for i in range(mp.cpu_count*2):
yield i
def initproc(host: str, port: int, client_id: int):
global connection
connection.connect(host, port, client_id)
def main():
host = "something"
port = 12345
mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
initializer=initproc,
initargs=(host, port, client_id())) as p:
res = p.starmap(processing_function, arg_list)
for the purposes of the question processing_function and arg_list are not relevant.
The issue is that I get an error with this:
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object
Is there any way to create an initialize a process in the pool in such a way that on of the arguments to initialize it would be the next number in a sequence?
P.S. In the code as written it may be possible to initialize all connection objects outside of the initializer function but in my particular instance it is not. I need to pass arguments for connection into the initializer.
Upvotes: 3
Views: 1262
Reputation: 47034
Although the accepted answer works, it seems to make an assumption on implementation details and is therefore not very reliable.
A different approach is to establish a Queue before calling Pool.map():
from multiprocessing import Pool,Queue
worker_id = None
def worker_init(q):
global worker_id
worker_id = q.get()
def work(w):
print("worker #%d working on %s"%(worker_id,w))
n = 3
q = Queue(n)
for i in range(n): q.put(i)
pool = Pool(n,initializer=worker_init,initargs=(q,))
pool.map(work,range(10,10+3*n))
which should output something similar to:
worker #0 working on 10
worker #2 working on 12
worker #1 working on 11
worker #2 working on 13
worker #0 working on 14
worker #2 working on 16
worker #1 working on 15
worker #2 working on 17
worker #0 working on 18
Upvotes: 0
Reputation: 21654
A simple solution for your case would be to use the sequential number of the child-process which is contained in the Process.name
. You could extract it with...
mp.current_process().name.split('-')[1]
If you need more control over where the sequence starts, you could use multiprocessing.Value
as a counter from which workers get their unique number.
import multiprocessing as mp
import time
def init_p(client_id):
with client_id.get_lock():
globals()['client_id'] = client_id.value
print(f"{mp.current_process().name},"
f" {mp.current_process().name.split('-')[1]}," # alternative
f" client_id:{globals()['client_id']}")
client_id.value += 1
if __name__ == "__main__":
ctx = mp.get_context("spawn")
client_ids = ctx.Value('i', 0)
with ctx.Pool(
processes=4,
initializer=init_p,
initargs=(client_ids,)
) as pool:
time.sleep(3)
Output:
SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3
Process finished with exit code 0
Upvotes: 2