OctavianWR
OctavianWR

Reputation: 227

Str split with expand in Dask Dataframe

I have 34 million row and only have a column. I want to split string into 4 column.

Here is my sample dataset (df):

    Log
0   Apr  4 20:30:33 100.51.100.254 dns,packet user: --- got query from 10.5.14.243:30648:
1   Apr  4 20:30:33 100.51.100.254 dns,packet user: id:78a4 rd:1 tc:0 aa:0 qr:0 ra:0 QUERY 'no error'
2   Apr  4 20:30:33 100.51.100.254 dns,packet user: question: tracking.intl.miui.com:A:IN
3   Apr  4 20:30:33 dns user: query from 9.5.10.243: #4746190 tracking.intl.miui.com. A

I want to split it into four column using this code:

df1 = df['Log'].str.split(n=3, expand=True)
df1.columns=['Month','Date','Time','Log']
df1.head()

Here is the result that i expected

     Month Date      Time                                              Log
0      Apr    4  20:30:33  100.51.100.254 dns,packet user: --- go...
1      Apr    4  20:30:33  100.51.100.254 dns,packet user: id:78a...
2      Apr    4  20:30:33  100.51.100.254 dns,packet user: questi...
3      Apr    4  20:30:33  dns transjakarta: query from 9.5.10.243: #474...
4      Apr    4  20:30:33  100.51.100.254 dns,packet user: --- se...

but the respond is like this:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-36-c9b2023fbf3e> in <module>
----> 1 df1 = df['Log'].str.split(n=3, expand=True)
      2 df1.columns=['Month','Date','Time','Log']
      3 df1.head()

TypeError: split() got an unexpected keyword argument 'expand'

Is there any solution to split the string using dask?

Upvotes: 4

Views: 2586

Answers (2)

Weston A. Greene
Weston A. Greene

Reputation: 137

For those like me that did not find str.split with the expand=True argument to work even when using the n= parameter, I wanted to point out that the "Old answer" from @MRocklin is incomplete (at least was for me): it is missing a join of the new DF with the original:

def f(df: pandas.DataFrame) -> pandas.DataFrame:
    df1 = df['Log'].str.split(n=3, expand=True)
    df1.columns=['Month','Date','Time','Log']
    *df = df.join(df1)*
    return df

Now, for me, there was another gotcha:

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['1', '2', '3', '4']
  Missing: []

To rectify, add:

for col in [f'{num}' for num in range(4)]:
    df1[col] = df1.get(col, float('nan'))

And here's the whole shebang:

data = {
    'Log': ['a;1;2;3', 'b;1;', 'c', 'd;1;2'],
    'dummy_col': ['a', 'b', 'c', 'd'],
}
df = dd.from_dict(data, npartitions=2)  # I made sure to choose a partition count lower than my row number.

def f(df: pd.DataFrame) -> pd.DataFrame:
    tmp_df = df['Log'].str.split(';', expand=True)  # New DF of just the splits. Looks something like this:
        # (but remember, Dask is passing _partitions_ to `f`, not the whole DF)
        #
        #       0   1   2   3
        #   0   a   1   2   3
        #   1   b   1       <NA>
    df = df.drop(columns=['Log'])
    new_col_names = ['Month','Date','Time','Log']
    tmp_df.columns = new_col_names[:len(tmp_df.columns)]  # Slice to only the columns within the current _partition_. Because only partitions are passed to `f`, we need to accommodate an odd partition that may have _no_ rows and therefore also have _no_ columns. TODO Someone please knowledge-check me on this.
    for col in new_col_names:
        tmp_df[col] = tmp_df.get(col, float('nan'))  # Fill in with blank if this partition happens not to have all the newly split columns
    df = df.join(tmp_df)  # add the newly split columns to the existing columns.
    return df

df = df.map_partitions(f)
df.compute()

Thank you so much @MRocklin for your answer to this question (and the many other answers you have throughout SO). Could not have solved my problem without your help.

Upvotes: 0

MRocklin
MRocklin

Reputation: 57281

Edit: this works now

Dask dataframe does support the str.split method's expand= keyword if you provide an n= keyword as well to tell it how many splits to expect.

Old answer

It looks like dask dataframes's str.split method doesn't implement the expand= keyword. You might raise an issue if one does not already exist.

As a short term workaround, you could make a Pandas function, and then use the map_partitions method to scale that across your dask dataframe

def f(df: pandas.DataFrame) -> pandas.DataFrame:
    """ This is your code from above, as a function """
    df1 = df['Log'].str.split(n=3, expand=True)
    df1.columns=['Month','Date','Time','Log']
    return df

ddf = ddf.map_partitions(f)  # apply to all pandas dataframes within dask dataframe

Because Dask dataframes are just collections of Pandas dataframes it's relatively easy to build things yourself when Dask dataframe doesn't support them.

Upvotes: 8

Related Questions