Reputation: 1071
I have the following code:
def func1(df):
x = 1
print('Processing func1')
return x
def func2(df):
x = 2
print('Processing func2')
return x
ddf = from_pandas(df, npartitions=3)
print('func1 processing started')
ddf.map_partitions(func1)
print('func1 processing ended')
print('func2 processing started')
ddf.map_partitions(func2)
print('func2 processing ended')
ddf.compute()
What I'm looking for is a way to log (or print in this case) the steps before, during and after each of the map partitions are executed.
However, since the ddf.compute()
triggers the map_partitions after the print functions, I get something like this:
func1 processing started
func1 processing ended
func2 processing started
func2 processing ended
Processing func1
Processing func1
Processing func1
Processing func2
Processing func2
Processing func2
Instead, I need
func1 processing started
Processing func1
Processing func1
Processing func1
func1 processing ended
func2 processing started
Processing func2
Processing func2
Processing func2
func2 processing ended
How to make this work? Note: My example uses print
, but I would like to synchromize map_partitions with any python function.
UPDATE
A more realistic scenario:
def func1():
df = dd.read_csv('file.csv', npartitions=3)
log('In func1')
df = func11(df,123)
log('func1 ended')
ddf.compute()
def func11(df,x):
log('In func11')
# ... do stuff with df
df = func111(df,x)
return df
def func111(df, x):
log('In func111')
df = df.map_partitions(func1111)
return df
def func1111(df):
df['abc'] = df['abc'] * 2
log('Processing func1111')
return df
log(msg):
print(msg) # or log in file or DB
The requirement is that this should print:
In func1
In func11
In func111
Processing func1111
Processing func1111
Processing func1111
func1 ended
Upvotes: 1
Views: 301
Reputation: 43083
You can wrap ddf
to queue ddf.map_partitions()
and log()
with ddf.persist()
and wait(ddf)
.
from dask.distributed import wait
class QueuedMapPartitionsWrapper:
def __init__(self, ddf, queue=None):
self.ddf = ddf
self.queue = queue or []
def map_partitions(self, func, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(True, func, args, kwargs)])
def log(self, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(False, log, args, kwargs)])
def compute(self):
ddf = self.ddf
for (map_partitions, func, args, kwargs) in self.queue:
if map_partitions:
ddf = ddf.map_partitions(func, *args, **kwargs)
else:
ddf = ddf.persist()
wait(ddf)
func(*args, **kwargs)
return ddf.compute()
Usage 1:
ddf = dd.from_pandas(df, npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf)
ddf = ddf.log('func1 processing started')
ddf = ddf.map_partitions(func1)
ddf = ddf.log('func1 processing ended')
ddf = ddf.log('func2 processing started')
ddf = ddf.map_partitions(func2)
ddf = ddf.log('func2 processing ended')
ddf.compute()
Usage 2:
def func1():
ddf = dd.read_csv('file.csv', npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf) # Either here
log('In func1')
ddf = func11(ddf, 123)
# ddf = QueuedMapPartitionsWrapper(ddf) # or here (just before ddf.log)
ddf = ddf.log('func1 ended')
ddf.compute()
Upvotes: 2
Reputation: 2245
Perhaps you could try wrapping your function with a decorator. If you put your desired logging behavior in a decorator, then you don't have to put a bunch of print statements all over your code.
import numpy as np
import pandas as pd
import dask.dataframe as dd
import time
from functools import wraps
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time.time()
print("Starting :%r" % f.__name__)
result = f(*args, **kw)
te = time.time()
print('Completed :%r took: %2.2f sec' % (f.__name__, te-ts))
return result
return wrap
@timing
def func1():
df = pd.DataFrame(np.random.random([3000, 10]))
ddf = dd.from_pandas(df, npartitions=3)
time.sleep(1)
ddf = func11(ddf, None)
df = ddf.compute()
return df
@timing
def func11(ddf, x):
time.sleep(1)
ddf = func111(ddf, None)
return ddf
@timing
def func1111(df):
df = df * 2
time.sleep(1)
return df
@timing
def func111(ddf, x):
ddf = ddf.map_partitions(func1111)
time.sleep(1)
return ddf
ddf = func1()
This gives the output:
Starting :'func1'
Starting :'func11'
Starting :'func111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func111' took: 2.01 sec
Completed :'func11' took: 3.01 sec
Starting :'func1111'
Starting :'func1111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1' took: 5.02 sec
Upvotes: 1
Reputation: 16561
One possibility is to create a new function that contains the desired sequence of actions:
def my_flow(df):
print('func1 processing started')
df = func1(df)
print('func1 processing ended')
print('func2 processing started')
df = func2(df)
print('func2 processing ended')
return df
ddf_new = ddf.map_partitions(my_flow)
Upvotes: 1