Reputation: 41
first of all, thanks for this community and all advice we can retrieve, it's really appreciate!
This is my first venture into parallel processing and I have been looking into Dask by my own but I am having trouble actually coding it... to be honest I am really lost
In on of my project, I want to trigger URL and retrieve observations data (meteorological station) from xml files.
For each URL, I run some different process in order to: retreive data from URL, parsing XML information to dataframe, apply a filter and store data in MySQL database.
So i need to loop these process over thousands of URL (station)...
I wrote a sequential code , and it take 300s to finish computation which is really to long and not efficient.
As we are applying the same process for each station, I think I can speed-up all the computations, but I don't know where to start. I used delayed from dask but I don't think it's the best approach.
This is my code so far: First I have some functions.
def xml_to_dataframe(ood_xml):
tmp_file = wget.download(ood_xml)
prstree = ETree.parse(tmp_file)
root = prstree.getroot()
################ Section to retrieve data for one station and apply parameter
all_obs = []
for obs in root.iter('observations'):
ood_observation = []
for n, param in enumerate(list_parameters):
x=obs.find(variable_to_check).text
ood_observation.append(x)
all_obs.append(ood_observation)
return(pd.DataFrame(all_obs, columns=list_parameters))
def filter_criteria(df,threshold,criteria):
if criteria in df.columns:
result = []
for index, row in df.iterrows():
if pd.to_numeric(row[criteria],errors='coerce') >= threshold:
result.append(index)
return result
else:
#print(criteria + ' parameter does not exist for this station !!! ')
return([])
def get_and_filter_data(filename,criteria,threshold):
try:
xmlToDf = xml_to_dataframe(filename)
final_df = xmlToDf.loc[filter_criteria(xmlToDf,threshold,criteria)]
some msql connection and instructions....
except:
pass
and then the main code I want to parallelise:
criteria = 'temperature'
threshold = 22
filenames =[url1.html, url2.html, url3.html]
for file in filenames:
get_and_filter_data(file,criteria,threshold)
Do you have any advice to do it ?
Many thanks for your help !
Guillaume
Upvotes: 1
Views: 214
Reputation: 16571
Not 100% sure this is what you are after, but one way is via delayed
:
from dask import delayed, compute
delayeds = [delayed(get_and_filter_data)(file,criteria,threshold) for file in filenames]
results = compute(delayeds)
Upvotes: 1