sachinruk
sachinruk

Reputation: 9869

Incompatibility of apply in dask and pandas dataframes

A sample of the triggers column in my Dask dataframe looks like the following:

0    [Total Traffic, DNS, UDP]
1                    [TCP RST]
2              [Total Traffic]
3                 [IP Private]
4                       [ICMP]
Name: triggers, dtype: object

I wish to create a one hot encoded version of the above arrays (putting a 1 against the DNS column in row 1 for example) by doing the following. pop_triggers contains all possible values of triggers.

for trig in pop_triggers:
    df[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0)

However, the Total Traffic, DNS etc. columns all contain the value 0 and not 1 for the relevant value. When I copy it into a pandas dataframe and do the same operation, they get the expected value.

a = df[[ 'Total Traffic', 'UDP', 'NTP Amplification', 'triggers', 'ICMP']].head()
for trig in pop_triggers:
    a[trig] = a.triggers.apply(lambda x: 1 if trig in x else 0)

What am I missing here? Is it because dask is lazy that somehow it's not filling in the values as expected?

Edit 1: I investigated some places where the flag was set in the first place (which turned out to be far less than I expected, and got some really weird results. See below:

df2 = df[df['Total Traffic']==1]
df2[['triggers']+pop_triggers].head()

output:

        triggers    Total Traffic   UDP DNS
9380    [ICMP, IP null, IP Private, TCP null, TCP SYN,...   1   1   1
9388    [ICMP, IP null, IP Private, TCP null, TCP SYN,...   1   1   1
19714   [ICMP, IP null, IP Private, UDP, NTP Amplifica...   1   1   1
21556   [IP null]   1   1   1
21557   [IP null]   1   1   1

possible bug maybe?

Edit 2: Minimal working example:

triggers = [['Total Traffic', 'DNS', 'UDP'],['TCP RST'],['Total Traffic'],['IP Private'],['ICMP']]*10
df2 = dd.from_pandas(pd.DataFrame({'triggers':triggers}), npartitions=16)
pop_triggers= ['Total Traffic', 'UDP', 'DNS', 'TCP SYN', 'TCP null', 'ICMP']
for trig in pop_triggers:
    df2[trig] = df2.triggers.apply(lambda x: 1 if trig in x else 0)
df2.head()

Output:

triggers    Total Traffic   UDP DNS TCP SYN TCP null    ICMP
0   [Total Traffic, DNS, UDP]   0   0   0   0   0   0
1   [TCP RST]   0   0   0   0   0   0
2   [Total Traffic] 0   0   0   0   0   0
3   [IP Private]    0   0   0   0   0   0

Note: I am far more concerned about the Dask side of things and not Pandas

Upvotes: 2

Views: 490

Answers (1)

Alex
Alex

Reputation: 579

In my experience apply in dask works much better with explicit metadata. There are some functionality that let dask attempt to guess the metadata but I found it slow and not always reliable. Also the guidance is to specify meta.

Another point in my experience is that assign works better than df[col] = .... Not sure if it's a bug, a limitation or a misuse on my side (I researched that a while ago and I don't think it's a bug).

EDIT: The first pattern doesn't work, the trig value used for previous columns in the loop seems to be updated with later values so at compute time, this gives only the result of the last value for all columns!

It's not a bug but the combination of not computing immediately while the lambda result of the delayed computation on the closure which is not evaluated yet. See this discussion for why it doesn't work.

My pattern for you would then be:

cols = {}
for trig in pop_triggers:
    meta = (trig, int)
    cols[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0, meta=meta)
df = df.assign(**cols)

CORRECT PATTERN:

(sorry didn't test previously as I run the same pattern except I don't use the looping value in the applied function so didn't face that behavior)

cols = {}

for trig in pop_triggers:
    meta = (trig, int)

    def fn(x, t):
        return 1 if t in x else 0

    cols[trig] = ddf.triggers.apply(fn, args=(trig,), meta=meta)
ddf = ddf.assign(**cols)

Upvotes: 3

Related Questions