Sam Dean
Sam Dean

Reputation: 399

Parallelized DataFrame Custom Function Dask

I am trying to use Dask to speed up a Python DataFrame for loop operation via Dask's multi-processing features. I am fully aware the for-looping dataframes is generally not best practice, but in my case, it is required. I have read pretty extensively through the documentation and other similar questions, but I cannot seem to figure my problem out.

df.head()
         Title                                                                                                                                       Content
0  Lizzibtz     @Ontario2020 @Travisdhanraj @fordnation Maybe.  They are not adding to the stress of education during Covid. Texas sample.  Plus…  
1  Jess 🌱🛹🏳️‍🌈  @BetoORourke So ashamed at how Abbott has not handled COVID in Texas. A majority of our large cities are hot spots with no end in sight.    
2  sidi diallo  New post (PVC Working Gloves) has been published on Covid-19 News Info - Texas test                    
3  Kautillya    @PandaJay What was the need to go to SC for yatra anyway? Isn't covid cases spiking exponentially? Ambubachi mela o… texas
4  SarahLou♡    RT @BenJolly9: 23rd June 2020 was the day Sir Keir Starmer let the Tories off the hook for their miss-handling of COVID-19. texas   

I have a custom python function defined as:

def locMp(df):
    hitList = []
    for i in range(len(df)):
        print(i)
        string = df.iloc[i]['Content']
        # print(string)
        doc = nlp(string)
        ents = [e.text for e in doc.ents if e.label_ == "GPE"]
        x = np.array(ents)
        print(np.unique(x))
        hitList.append(np.unique(x))

    df['Locations'] = hitList
    return df

This function adds a dataframe column of locations extracted from a library called spacy - I do not think that is important, but I want you to see the whole function.

Now, via the documentation and a few other questions out there. The way to use Dask's multiprocessing for a dataframe is to create a Dask dataframe, partition it, map_partitions, and .compute(). So, I have tried the following and some other options with no luck:

part = 7
ddf = dd.from_pandas(df, npartitions=part)
location = ddf.map_partitions(lambda df: df.apply(locMp), meta=pd.DataFrame).compute()

# and...

part = 7
ddf = dd.from_pandas(df, npartitions=part)
location = ddf.map_partitions(locMp, meta=pd.DataFrame).compute()

# and simplifying from Dask documentation

part = 7
ddf = dd.from_pandas(df, npartitions=part)
location = ddf.map_partitions(locMp)

I have tried a few other things with dask.delayed but nothing seems to work. I either get a Dask Series or some other undesired output OR the function takes as long as or longer than just running it regularly. How can I use Dask to speed up custom DataFrame function operations and return a clean Pandas Dataframe?

Thank you

Upvotes: 0

Views: 283

Answers (1)

joebeeson
joebeeson

Reputation: 4366

You could try letting Dask handle the application instead of doing the looping yourself:

ddf["Locations"] = ddf["Content"].apply(
    lambda string: [e.text for e in nlp(string).ents if e.label_ == "GPE"],
    meta=("Content", "object"))

Upvotes: 1

Related Questions