FábioRB
FábioRB

Reputation: 439

Dask (delayed) vs pandas/function returns

I am trying to study a little bit about dask as a solution my parallel computing over some big data I have.

I have a code where I check a list of transactions and extract the number of active customers on every period (an active customer is a customer that has any transaction with last 90 days).

This is the code for sample data :

import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import dask.dataframe as dd
import dask 

num_variables = 10000
rng = np.random.default_rng()

df = pd.DataFrame({
    'id' :  np.random.randint(1,999999999,num_variables),
    'date' : [np.random.choice(pd.date_range(datetime(2021,6,1),datetime(2022,12,31))) for i in range(num_variables)],
    'product' : [np.random.choice(['giftcards', 'afiliates']) for i in range(num_variables)],
    'brand' : [np.random.choice(['brand_1', 'brand_2', 'brand_4', 'brand_6']) for i in range(num_variables)],
    'gmv': rng.random(num_variables) * 100,
    'revenue': rng.random(num_variables) * 100})

This is the "way 1" to execute (using pandas and simple functions)

def active_clients(df : pd.DataFrame , date : date):
    date1 = (date - timedelta(days=90))
    date2 = date
    clients_base = df.loc[(df['date'].dt.date >= date1) & (df['date'].dt.date <= date2),'id'].nunique()
    return (date, clients_base)

months = []
results = []

dates = df.date.dt.to_period('M').drop_duplicates()
for i in dates:
    test = pd.Period(i,freq='M').end_time.date()
    months.append(test)

for i in months:
    test = active_clients(df,i)
    results.append(test)

results

The result here is a list of tuples:

[(datetime.date(2022, 7, 31), 24),
 (datetime.date(2022, 10, 31), 48),
 (datetime.date(2022, 12, 31), 43),
 (datetime.date(2022, 8, 31), 42),
 (datetime.date(2022, 9, 30), 46),
 (datetime.date(2022, 11, 30), 46),
 (datetime.date(2022, 6, 30), 11)]

This is the "way 2" to execute (using dask delayed and functions)

Now I am trying to do exactly the same by using dask delayed as a way to paralelize calculation.

@dask.delayed
def active_clients(df : pd.DataFrame , date : date):
    date1 = (date - timedelta(days=90))
    date2 = date
    clients_base = df.loc[(df['date'].dt.date >= date1) & (df['date'].dt.date <= date2),'id'].nunique()
    return (date, clients_base)

months = []
results = []

dates = df.date.dt.to_period('M').drop_duplicates()
for i in dates:
    test = dask.delayed(pd.Period(i,freq='M').end_time.date())
    months.append(test)

for i in months:
    test = dask.delayed(active_clients(df,i))
    results.append(test)

resultados = dask.compute(results)

resultados:

([(datetime.date(2022, 7, 31), 24),
  (datetime.date(2022, 10, 31), 48),
  (datetime.date(2022, 12, 31), 43),
  (datetime.date(2022, 8, 31), 42),
  (datetime.date(2022, 9, 30), 46),
  (datetime.date(2022, 11, 30), 46),
  (datetime.date(2022, 6, 30), 11)],)

The issues here are:

  1. the code above returns me a tuple of a list of a tuple (different from the other code)
  2. It does not seen to parallelize as only of one cores seems to be under hard work. What am I doing wrong?

Thanks

Upvotes: 1

Views: 121

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

One quick fix to your code is to remove nested delayed calls, as the relevant function is already decorated with delayed so there is no need to wrap it in another delayed:

@dask.delayed
def active_clients(df : pd.DataFrame , date : date):
    date1 = (date - timedelta(days=90))
    date2 = date
    clients_base = df.loc[(df['date'].dt.date >= date1) & (df['date'].dt.date <= date2),'id'].nunique()
    return (date, clients_base)

months = []
results = []

dates = df.date.dt.to_period('M').drop_duplicates()
months = [pd.Period(i,freq='M').end_time.date() for i in dates]

for i in months:
    test = active_clients(df,i)  # note this will be delayed due to decoration of active_clients
    results.append(test)

resultados = dask.compute(*results)  # this will return a single list of results

The result of dask.compute will return a tuple as the code is intended to be used with multiple delayed values, so if you unpack the list of delayeds, then the computed results will be placed in resultados as a tuple.

Upvotes: 1

Related Questions