Aayush Scet
Aayush Scet

Reputation: 37

How to share a global variable with another script in multiprocessing?

Question: How can I use variable x in script2? I have 2 scripts in which 1st contains 2 multiprocessing functions and 2nd contains 1 multiprocessing function. How can I use a shared variable for all 3 multiprocessing functions?

script1.py

from script2 import function3
x = None
def function1():
    global x
    while True:
        x = input()  # updates global variable x

def function2():
    global x
    while True:
        print(x)     # prints global variable x

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

# some condition to stop all processes

script2.py

def function3():
    while True:      
        print(x*2)   # prints global variable x*2

Upvotes: 0

Views: 924

Answers (1)

Booboo
Booboo

Reputation: 44313

Here is an example of creating a shared managed string value per the comment offered by @martineau.

On a platform such as Linux where fork by default is used to create new processes you could code:

import multiprocessing
from ctypes import c_char_p

s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()

def function1():
    s.value = 'New value'  # updates global variable s
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new s value
    print(s.value)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()

Prints:

New value

On platforms such as Windows where spawn is used to create new processes, the shared string is being passed as an argument to the processes to ensure that only one instance of the string is being created.

import multiprocessing
from ctypes import c_char_p

def function1(s, event):
    s.value = 'New value'
    event.set() # show we have a new value

def function2(s, event):
    event.wait() # wait for new s value
    print(s.value)

# I need this for Windows:
if __name__ == '__main__':
    s = multiprocessing.Manager().Value(c_char_p, '')
    event = multiprocessing.Event()
    p1 = multiprocessing.Process(target=function1, args=(s, event))
    p2 = multiprocessing.Process(target=function2, args=(s, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Prints:

New value

The if __name__ == '__main__': check above is needed or else we would get into a recursive loop because our newly created processes start executing the source from the top and without that check would create new processes ad infinitum. And for that reason the definitions of s and event cannot be outside that check or else each newly created process would be creating its own instance of these variables. But that means we now have to be passing these variables as arguments whereas in the forking example they can just be inherited.

Update: Creating a Shared numpy Array on Linux/Unix

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

Prints:

arr = [[1 1 1]
 [1 1 1]]
arr = [[1 1 1]
 [1 1 1]]

Creating a Shared numpy Array on Windows

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def function1(arr, event):
    shape = arr.shape
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2(arr, event):
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()

    p1 = multiprocessing.Process(target=function1, args=(arr, event))
    p2 = multiprocessing.Process(target=function2, args=(arr, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('arr =', arr)

Using a Shared numpy Array With a Multiprocessing Pool on Windows

When using a multiprocessing pool, whether you are passing the array as an argument to the worker function or as in this case using it to initialize a global variable for each process in the pool, you must pass the shared array to each process and recreate a numpy array from that.

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def init_pool(shared_array, the_shape, the_event):
    global arr, shape, event
    shape = the_shape
    event = the_event
    # recreate the numpy array from the shared array:
    arr = to_numpy_array(shared_array, shape)

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
    pool.apply_async(function1)
    pool.apply_async(function2)
    # wait for tasks to complete
    pool.close()
    pool.join()
    print('arr =', arr)

Using a Shared numpy Array With a Multiprocessing Pool on Linux/Unix

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

Upvotes: 2

Related Questions