Reputation: 1071
In the dask code below I set x with 1 and 2 right before executing two map_partitions
. The result seems fine, however I don't fully understand it.
If dask waits to run the two map_partitions
only when it finds the compute()
, and at the time it finds the compute()
x is 2, how dask knows that x = 1 in the first map_partitions
?
pdf = pd.DataFrame({
'id': [1, 1, 1, 2, 2, 3, 4, 1, 2, 2, 1],
'balance': [150, 140, 130, 280, 260, 220, 230, 330, 420, 120, 210]
})
ddf = dd.from_pandas(pdf, npartitions=2)
def func(df, a):
return a
x = 1
ddf['should_be_1'] = ddf.map_partitions(func, x, meta='int')
x = 2
ddf['should_be_2'] = ddf.map_partitions(func, x, meta='int')
ddf.compute()
id balance should_be_1 should_be_2
0 1 150 1 2
1 1 140 1 2
2 1 130 1 2
3 2 280 1 2
4 2 260 1 2
5 3 220 1 2
6 4 230 1 2
7 1 330 1 2
8 2 420 1 2
9 2 120 1 2
10 1 210 1 2
Upvotes: 1
Views: 36
Reputation: 16561
The computations are delayed, however dask keeps track of the values of the arguments passed to the delayed functions. Changing value of an argument later will not change the value passed to an earlier delayed computation:
from dask import delayed
@delayed
def f(x):
return x
x = 1
a = f(x)
x = 2
b = f(x)
print(dict(a.dask))
# {'some_hash': (<function f at 0x7fab1b72c550>, 1)}
print(dict(b.dask))
# {'some_hash': (<function f at 0x7fab1b72c550>, 2)}
Upvotes: 2