Reputation: 23
I have a 32 core machine with 256gb ram and am new to parallel computing. I have to run this line:
Flag=data.flag [:,:,x]
which is a very large matrix. "x" refers to the sub-matrix that I want to store to file. But "changes" 64 times and each iteration takes +- 8min, hence the need to parallelize it
I've looked at a few examples: https://www.machinelearningplus.com/python/parallel-processing-python/ and https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming
Function for code:
def multi_flagger(antenna_no):
recv=data.corr_products[antenna_no][0] # Gets the antenna name and polarization value
flagger=data.flags[:, :, antenna_no] # Flags the data based on the antenna value
mat_flag = np.matrix(flagger) # Morphs the 2d array into a matrix
np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag) # Saves the data to a file
pool.map(multi_flagger, for i in range(2))
pool.close()
I get this error:
File "<ipython-input-58-e1a6f9779b9a>", line 1 pool.map(multi_flagger, for i in range(2)) ^ SyntaxError: invalid syntax`
What I would like is 64 .npy files written to disk
Upvotes: 0
Views: 611
Reputation: 23
so the question was to do with a specific package called "katdal" used in radio telescopes. I figured out the solution, it was because of the flagger line. The data needed to be read each time
def multi_flagger(antenna_no):
data = katdal.open('/'+prefix+'/'+fname+'/'+fname+'/'+fname+'_sdp_l0.full.rdb')
recv=data.corr_products[antenna_no][0] # Gets the antenna and polarization value
flagger=data.flags[:, :, antenna_no] # Flags the data based on the antenna value
np.save('Flagged_data_'+str(recv)+'_.npy', np.matrix(flagger) ) # Morphs the 2d array into a matrix and Saves the data to a file
But this leaves me with a new issue about the data file being read in all the time, essentially taking up time and space.
Upvotes: 0
Reputation: 29536
The method Pool.map(func, iterable)
expects an iterable, which can be a tuple
or a list
. You cannot pass a for
loop as what you have tried (for i in range(2)
), but you can instead pass a list of numbers from a range:
pool.map(multi_flagger, list(range(2))) # [0, 1]
I'm not sure about the rest of your code, but when using a Pool
, you set the number of worker processes when you create the Pool
instance (as explained in the Using a pool of workers example) and then call map
to pass in the function to execute and the function inputs:
def multi_flagger(antenna_no):
recv=data.corr_products[antenna_no][0]
flagger=data.flags[:, :, antenna_no]
mat_flag = np.matrix(flagger)
np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag)
with Pool(processes=3) as pool: # set the number of worker processes
pool.map(multi_flagger, list(range(2))) # pass a list of antenna_no
pool.close()
Upvotes: 1