neuro9
neuro9

Reputation: 29

Using Dask Delayed on Small/Partitioned Dataframes

I am working with time series data that is formatted as each row is a single instance of a ID/time/data. This means that the rows don't correspond 1 to 1 for each ID. Each ID has many rows across time.

I am trying to use dask delayed to have a function run on an entire ID sequence (it makes sense that the operation should be able to run on each individual ID at the same time since they don't affect each other). To do this I am first looping through each of the ID tags, pulling/locating all the data from that ID (with .loc in pandas, so it is a separate "mini" df), then delaying the function call on the mini df, adding a column with the delayed values and adding it to a list of all mini dfs. At the end of the for loop I want to call dask.compute() on all the mini-dfs at once but for some reason the mini df's values are still delayed. Below I will post some pseudocode about what I just tried to explain.

I have a feeling that this may not be the best way to go about this but it's what made sense at the time and I can't understand whats wrong so any help would be very much appreciated.

Here is what I am trying to do:

list_of_mini_dfs = []
for id in big_df:

    curr_df = big_df.loc[big_df['id'] == id]
    curr_df['new value 1'] = dask.delayed(myfunc)(args1)
    curr_df['new value 2'] = dask.delayed(myfunc)(args2) #same func as previous line

    list_of_mini_dfs.append(curr_df)

list_of_mini_dfs = dask.delayed(list_of_mini_dfs).compute()

Concat all mini dfs into new big df.

As you can see by the code I have to reach into my big/overall dataframe to pull out each ID's sequence of data since it is interspersed throughout the rows. I want to be able to call a delayed function on that single ID's data and then return the values from the function call into the big/overall dataframe.

Currently this method is not working, when I concat all the mini dataframes back together the two values I have delayed are still delayed, which leads me to think that it is due to the way I am delaying a function within a df and trying to compute the list of dataframes. I just can't see how to fix it.

Hopefully this was relatively clear and thank you for the help.

Upvotes: 0

Views: 464

Answers (1)

rpanai
rpanai

Reputation: 13437

IIUC you are trying to do a sort of transform using dask.

import pandas as pd
import dask.dataframe as dd
import numpy as np

# generate big_df
dates = pd.date_range(start='2019-01-01',
                      end='2020-01-01')
l = len(dates)
out = []
for i in range(1000):
    df = pd.DataFrame({"ID":[i]*l,
                       "date": dates,
                       "data0": np.random.randn(l),
                       "data1": np.random.randn(l)})

    out.append(df)

big_df = pd.concat(out, ignore_index=True)\
           .sample(frac=1)\
           .reset_index(drop=True)

Now you want to apply your function fun on columns data0 and data1

Pandas

out = big_df.groupby("ID")[["data0","data1"]]\
            .apply(fun)\
            .reset_index()

df_pd = pd.merge(big_df, out, how="left", on="ID" )

Dask

df = dd.from_pandas(big_df, npartitions=4)

out = df.groupby("ID")[["data0","data1"]]\
        .apply(fun, meta={'data0':'f8',
                          'data1':'f8'})\
        .rename(columns={'data0': 'new_values0',
                         'data1': 'new_values1'})\
        .compute() # Here you need to compute otherwise you'll get NaNs

df_dask = dd.merge(df, out,
                   how="left", 
                   left_on=["ID"],
                   right_index=True)

The dask version is not necessarily faster than the pandas one. In particular if your df fits in RAM.

Upvotes: 1

Related Questions