Guimeteo
Guimeteo

Reputation: 41

How to apply dask method to apply functions on files in list?

first of all, thanks for this community and all advice we can retrieve, it's really appreciate!
This is my first venture into parallel processing and I have been looking into Dask by my own but I am having trouble actually coding it... to be honest I am really lost

In on of my project, I want to trigger URL and retrieve observations data (meteorological station) from xml files. For each URL, I run some different process in order to: retreive data from URL, parsing XML information to dataframe, apply a filter and store data in MySQL database.
So i need to loop these process over thousands of URL (station)...

I wrote a sequential code , and it take 300s to finish computation which is really to long and not efficient.

As we are applying the same process for each station, I think I can speed-up all the computations, but I don't know where to start. I used delayed from dask but I don't think it's the best approach.

This is my code so far: First I have some functions.

def xml_to_dataframe(ood_xml):
    tmp_file = wget.download(ood_xml)
    prstree = ETree.parse(tmp_file)
    root = prstree.getroot()    
    ################ Section to retrieve data for one station and apply parameter
    all_obs = []  
    for obs in root.iter('observations'):
        ood_observation = []
        for n, param in enumerate(list_parameters):
            x=obs.find(variable_to_check).text
            ood_observation.append(x)    
        all_obs.append(ood_observation)
    return(pd.DataFrame(all_obs, columns=list_parameters))   
        
def filter_criteria(df,threshold,criteria):   
    if criteria in df.columns:
        result = []
        for index, row in df.iterrows():
            if pd.to_numeric(row[criteria],errors='coerce') >= threshold:
                result.append(index)    
        return result   
    else:
        #print(criteria + ' parameter does not exist for this station !!! ')
        return([])  
  
def get_and_filter_data(filename,criteria,threshold):
    try:
        xmlToDf = xml_to_dataframe(filename)
        final_df = xmlToDf.loc[filter_criteria(xmlToDf,threshold,criteria)]
        some msql connection and instructions....
    except:
        pass

and then the main code I want to parallelise:

criteria = 'temperature'
threshold = 22   
filenames =[url1.html, url2.html, url3.html]

for file in filenames:
    get_and_filter_data(file,criteria,threshold)

Do you have any advice to do it ?

Many thanks for your help !

Guillaume

Upvotes: 1

Views: 214

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16571

Not 100% sure this is what you are after, but one way is via delayed:

from dask import delayed, compute

delayeds = [delayed(get_and_filter_data)(file,criteria,threshold) for file in filenames]
results = compute(delayeds)

Upvotes: 1

Related Questions