tomtom
tomtom

Reputation: 3

Python - shared memory and multiprocessing queue

Comparing codes from multiple data sets and assigning new codes or keeping the one they have if not used. Need to integrate a queue or shared memory so multiple processes can run different shards at the same time. As it is, the script will send out the same "new code" multiple times.

import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
    
    used_codes = []
    new_codes = []
    
    def generate_code():
        random_number = random.randint(1000,9000)
        return random_number
    
    
    def create_codes():
        global new_codes
        new_codes = []
        for i in range(0, 10):
            new_codes.append(generate_code())
    
    
    def get_code(code):
        global new_codes
        global used_codes
        if(code not in used_codes):
            used_codes.append(code)
            return code
        created_code = new_codes[0]
        new_codes = new_codes[1:]
        while(created_code in used_codes):
            created_code = new_codes[0]
            new_codes = new_codes[1:]
        return created_code
    
    get_code_udf = F.udf(lambda code: get_code(code), T.StringType())


 



Upvotes: 0

Views: 4049

Answers (1)

Mandraenke
Mandraenke

Reputation: 3266

You use Python 3.8 and later?

See at https://docs.python.org/3/library/multiprocessing.shared_memory.html - but basically that's shared memory speak bytes. Together with some glue code that works quite well, here NumPy:

>>> # In the first Python interactive shell 
>>> import numpy as np 
>>> a = np.array([1, 1, 2, 3, 5, 8]) 
# Start with an existing NumPy array 
>>> from multiprocessing import shared_memory    
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes) 
>>> # Now create a NumPy array backed by shared memory 
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) 
>>> b[:] = a[:] # Copy the original data into shared memory 
>>> b array([1, 1, 2, 3, 5, 8]) 
>>> type(b) <class 'numpy.ndarray'> 
>>> type(a) <class 'numpy.ndarray'> 
>>> shm.name # We did not specify a name so one was chosen for us 'psm_21467_46075' 

>>> # In either the same shell or a new Python shell on the same machine 
>>> import numpy as np 
>>> from multiprocessing import shared_memory 
>>> # Attach to the existing shared memory block 
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075') 
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example 
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf) 
>>> c array([1, 1, 2, 3, 5, 8]) 
>>> c[-1] = 888 
>>> c array([ 1, 1, 2, 3, 5, 888]) 
>>> # Back in the first Python interactive shell, b reflects this change 
>>> b array([ 1, 1, 2, 3, 5, 888])

>>> # Clean up from within the second Python shell 
>>> del c # Unnecessary; merely emphasizing the array is no longer used 
>>> existing_shm.close() 
>>> # Clean up from within the first Python shell 
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close() 
>>> shm.unlink() # Free and release the shared memory block at the very end

Upvotes: 2

Related Questions