edesz
edesz

Reputation: 12406

Dask DataFrame filter and repartition gives some empty partitions

I am trying to filter a Dask DataFrame and then use map_partitions to apply a function to each partition. The function expects a pandas DataFrame with at least 1 row.

Here is the code to generate some dummy data as a pandas DataFrame (and then convert to Dask DataFrame) for a MCVE

def create_data(n):
    df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
    random_integers = np.random.default_rng().choice(14, size=n, replace=False)
    df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
    return df

df = create_data(n=10)
print(df.head(15))
>>>
    store_id         A
0         10  0.850730
1         10  0.581119
2         10  0.825802
3         10  0.657797
4         10  0.291961
5         10  0.864984
6          9  0.161334
7          9  0.397162
8          9  0.089300
9          9  0.435914
10         9  0.750741
11         9  0.920625
12         3  0.635727
13         3  0.425270
14         3  0.904043

Structure of the data: for each store_id, there are exactly 6 rows.

Now I create a list of some number of store_ids that I want to use to filter the above data

filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]

I then convert the above data (a pandas DataFrame) into a dask.dataframe

ddf = dd.from_pandas(df, npartitions=10)

Now I print the partitions of the ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6

This is expected. Each partition has 6 rows and one (unique) store_id. So, each partition contains data for a single store_id.

I now filter the Dask dataframe using the list of store_ids from above

ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]

Again I print the partitions of the filtered ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6

This is expected since each partition has one store_id and, by filtering, some partitions will be filtered out entirely and so they will contain zero rows.

So, now I'll re-partition the filtered Dataframe per Dask DataFrame best practices

ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
              store_id        A
npartitions=6                  
0                int64  float64
6                  ...      ...
...                ...      ...
48                 ...      ...
59                 ...      ...
Dask Name: repartition, 47 tasks

I expected this re-partitioning operation to result in only evenly sized non-empty partitions. But, now when I re-print the partitions, I get a similar output to the previous one (uneven partition sizes and some empty partitions), as though the re-partitioning did not happen

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6

My next step is to apply a function to each partition after filtering but this won't work since there are some partitions (pandas DataFrames) that the function cannot handle as they are missing rows.

def myadd(df):
    assert df.shape[0] > 0
    ...
    return ...

ddf.map_partitions(myadd)
>>> AssertionError                            Traceback (most recent call last)
.
.
.
AssertionError: 

The Dask documentation for re-partitioning is well-explained (same for the best practice that I linked above) and it seems simple enough but, after re-partitioning, I am still getting some partitions with zero rows and map_partitions will fail here. I'm sure I missing something here.

There are a couple of SO posts about re-partitioning (1, 2) but they don't deal with empty partitions.

Question

Is there a way to ensure that, after re-partitioning, all partitions will have 6 rows again and no empty partitions? i.e. is it possible to have a re-partitioned Dask DataFrame with equally sized (non-empty) partitions?

EDIT

It looks like empty partitions cannot be dealt with in Dask, at the moment: issues 1, 2. These could be connected to the problem I am experiencing here.

Upvotes: 2

Views: 1750

Answers (1)

edesz
edesz

Reputation: 12406

I found two existing posts from SO

and I used them as follows to solve this problem.

Start with the original code from the questions (no changes needed)

.
<identical code from question here>
.
ddf = ddf.repartition(npartitions=len(filtered_store_ids))

Next, I just call the two functions in succession on the the repartitioned ddf

ddf = cull_empty_partitions(ddf)  # remove empties
ddf = _rebalance_ddf(ddf)         # re-size

When I now re-print partition sizes, all are evenly sized and none are empty

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6

Upvotes: 3

Related Questions