B.Eng
B.Eng

Reputation: 23

Python multiprocessing, cannot iterate with pool.map

I have a 32 core machine with 256gb ram and am new to parallel computing. I have to run this line:

Flag=data.flag [:,:,x]

which is a very large matrix. "x" refers to the sub-matrix that I want to store to file. But "changes" 64 times and each iteration takes +- 8min, hence the need to parallelize it

I've looked at a few examples: https://www.machinelearningplus.com/python/parallel-processing-python/ and https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

Function for code:

def multi_flagger(antenna_no):
    recv=data.corr_products[antenna_no][0]         # Gets the antenna name and polarization value
    flagger=data.flags[:, :, antenna_no]         # Flags the data based on the antenna value
    mat_flag = np.matrix(flagger)                # Morphs the 2d array into a matrix
    np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag)  # Saves the data to a file

pool.map(multi_flagger, for i in range(2))
pool.close()

I get this error:

File "<ipython-input-58-e1a6f9779b9a>", line 1
   pool.map(multi_flagger, for i in range(2))
                             ^ SyntaxError: invalid syntax`

What I would like is 64 .npy files written to disk

Upvotes: 0

Views: 611

Answers (2)

B.Eng
B.Eng

Reputation: 23

so the question was to do with a specific package called "katdal" used in radio telescopes. I figured out the solution, it was because of the flagger line. The data needed to be read each time

def multi_flagger(antenna_no): data = katdal.open('/'+prefix+'/'+fname+'/'+fname+'/'+fname+'_sdp_l0.full.rdb')
recv=data.corr_products[antenna_no][0] # Gets the antenna and polarization value flagger=data.flags[:, :, antenna_no] # Flags the data based on the antenna value np.save('Flagged_data_'+str(recv)+'_.npy', np.matrix(flagger) ) # Morphs the 2d array into a matrix and Saves the data to a file

But this leaves me with a new issue about the data file being read in all the time, essentially taking up time and space.

Upvotes: 0

Gino Mempin
Gino Mempin

Reputation: 29536

The method Pool.map(func, iterable) expects an iterable, which can be a tuple or a list. You cannot pass a for loop as what you have tried (for i in range(2)), but you can instead pass a list of numbers from a range:

pool.map(multi_flagger, list(range(2)))  # [0, 1]

I'm not sure about the rest of your code, but when using a Pool, you set the number of worker processes when you create the Pool instance (as explained in the Using a pool of workers example) and then call map to pass in the function to execute and the function inputs:

def multi_flagger(antenna_no):
    recv=data.corr_products[antenna_no][0]         
    flagger=data.flags[:, :, antenna_no]         
    mat_flag = np.matrix(flagger)                    
    np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag)

with Pool(processes=3) as pool:               # set the number of worker processes
    pool.map(multi_flagger, list(range(2)))   # pass a list of antenna_no
    pool.close()

Upvotes: 1

Related Questions