Karlson
Karlson

Reputation: 3048

Multiprocess pool initialization with sequential initializer argument

I have some code like the following:

import multiprocessing as mp

connection: module.Connection

def client_id():
    for i in range(mp.cpu_count*2):
        yield i

def initproc(host: str, port: int, client_id: int):
    global connection
    connection.connect(host, port, client_id)

def main():
    host = "something"
    port = 12345
    mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
                                 initializer=initproc,
                                 initargs=(host, port, client_id())) as p:
        res = p.starmap(processing_function, arg_list)
    

for the purposes of the question processing_function and arg_list are not relevant.

The issue is that I get an error with this:

    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object

Is there any way to create an initialize a process in the pool in such a way that on of the arguments to initialize it would be the next number in a sequence?

P.S. In the code as written it may be possible to initialize all connection objects outside of the initializer function but in my particular instance it is not. I need to pass arguments for connection into the initializer.

Upvotes: 3

Views: 1262

Answers (2)

mvds
mvds

Reputation: 47034

Although the accepted answer works, it seems to make an assumption on implementation details and is therefore not very reliable.

A different approach is to establish a Queue before calling Pool.map():

from multiprocessing import Pool,Queue

worker_id = None
def worker_init(q):
    global worker_id
    worker_id = q.get()

def work(w):
    print("worker #%d working on %s"%(worker_id,w))

n = 3
q = Queue(n)
for i in range(n): q.put(i)
pool = Pool(n,initializer=worker_init,initargs=(q,))
pool.map(work,range(10,10+3*n))

which should output something similar to:

worker #0 working on 10
worker #2 working on 12
worker #1 working on 11
worker #2 working on 13
worker #0 working on 14
worker #2 working on 16
worker #1 working on 15
worker #2 working on 17
worker #0 working on 18

Upvotes: 0

Darkonaut
Darkonaut

Reputation: 21654

A simple solution for your case would be to use the sequential number of the child-process which is contained in the Process.name. You could extract it with...

mp.current_process().name.split('-')[1]

If you need more control over where the sequence starts, you could use multiprocessing.Value as a counter from which workers get their unique number.

import multiprocessing as mp
import time


def init_p(client_id):
    with client_id.get_lock():
        globals()['client_id'] = client_id.value
        print(f"{mp.current_process().name},"
              f" {mp.current_process().name.split('-')[1]},"  # alternative
              f" client_id:{globals()['client_id']}")
        client_id.value += 1


if __name__ == "__main__":

    ctx = mp.get_context("spawn")
    client_ids = ctx.Value('i', 0)

    with ctx.Pool(
            processes=4,
            initializer=init_p,
            initargs=(client_ids,)
    ) as pool:

        time.sleep(3)

Output:

SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3

Process finished with exit code 0

Upvotes: 2

Related Questions