AAriam
AAriam

Reputation: 417

Python polars dataframe transformation: from flat dataframe to one dataframe per category

I have a flat dataframe representing data in multiple databases, where each database has multiple tables, each table has multiple columns, and each column has multiple values:

df = pl.DataFrame(
    {
        'db_id': ["db_1", "db_1", "db_1", "db_2", "db_2", "db_2"],
        'table_id': ['tab_1', 'tab_1', 'tab_2', 'tab_1', 'tab_2', 'tab_2'],
        'column_id': ['col_1', 'col_2', 'col_1', 'col_2', 'col_1', 'col_3'],
        'data': [[1, 2, 3], [10, 20, 30], [4, 5], [40, 50], [6], [60]]
    }
)
shape: (6, 4)
┌───────┬──────────┬───────────┬──────────────┐
│ db_id ┆ table_id ┆ column_id ┆ data         │
│ ---   ┆ ---      ┆ ---       ┆ ---          │
│ str   ┆ str      ┆ str       ┆ list[i64]    │
╞═══════╪══════════╪═══════════╪══════════════╡
│ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    │
│ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] │
│ db_1  ┆ tab_2    ┆ col_1     ┆ [4, 5]       │
│ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     │
│ db_2  ┆ tab_2    ┆ col_1     ┆ [6]          │
│ db_2  ┆ tab_2    ┆ col_3     ┆ [60]         │
└───────┴──────────┴───────────┴──────────────┘

As you can see, different databases share some tables, and tables share some columns.

I want to extract one dataframe per table_id from the above dataframe, where the extracted dataframe is transposed and exploded, i.e. the extracted dataframe should have as its columns the set of column_ids corresponding to the specific table_id (plus db_id), with values being the corresponding values in data. That is, for the above example, the result should be a dictionary with keys "tab_1" and "tab_2", and values being the following dataframes:

tab_1:

shape: (5, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_2 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 1     ┆ 10    │
│ db_1  ┆ 2     ┆ 20    │
│ db_1  ┆ 3     ┆ 30    │
│ db_2  ┆ null  ┆ 40    │
│ db_2  ┆ null  ┆ 50    │
└───────┴───────┴───────┘

tab_2:

shape: (3, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_3 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 4     ┆ null  │
│ db_1  ┆ 5     ┆ null  │
│ db_2  ┆ 6     ┆ 60    │
└───────┴───────┴───────┘

I have a working function that does just that (see below), but it's a bit slow. So, I'm wondering if there is a faster way to achieve this?

This is my current solution:

def dataframe_per_table(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    col_name__other_ids = list(col_name__other_ids)
    table_dfs = {}

    for (table_name, *_), table in df.group_by(
        [col_name__table_id] + col_name__other_ids
    ):
        new_table = table.select(
            pl.col(col_name__other_ids + [col_name__col_id, col_name__values])
        ).pivot(
            on=col_name__col_id,
            index=col_name__other_ids,
            values=col_name__values,
            aggregate_function=None,
        ).explode(
            columns=table[col_name__col_id].unique().to_list()
        )

        table_dfs[table_name] = pl.concat(
            [table_dfs.setdefault(table_name, pl.DataFrame()), new_table],
            how="diagonal"
        )
    return table_dfs

Update: Benchmarking/Summary of Answers

On a dataframe with ~2.5 million rows, my original solution takes about 70 minutes to complete.

Disclaimer: since the execution times were too long, I only timed each solution once (i.e. 1 run, 1 loop), so the margin of error is large.

However, right after posting the question, I realized I can make it much faster just by performing the concat in a separate loop, so that each final dataframe is created by one concat operation instead of many:

def dataframe_per_table_v2(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    col_name__other_ids = list(col_name__other_ids)
    table_dfs = {}

    for (table_name, *_), table in df.group_by(
        [col_name__table_id] + col_name__other_ids
    ):
        new_table = table.select(
            pl.col(col_name__other_ids + [col_name__col_id, col_name__values])
        ).pivot(
            on=col_name__col_id,
            index=col_name__other_ids,
            values=col_name__values,
            aggregate_function=None,
        ).explode(
            columns=table[col_name__col_id].unique().to_list()
        )

        # Up until here nothing is changed.
        # Now, instead of directly concatenating, we just 
        #  append the new dataframe to a list
        table_dfs.setdefault(table_name, list()).append(new_table)

    # Now, in a separate loop, each final dataframe is created
    #  by concatenating all collected dataframes once.
    for table_name, table_sub_dfs in table_dfs.items():
        table_dfs[table_name] = pl.concat(
            table_sub_dfs,
            how="diagonal"
        )
    return table_dfs

This reduced the time from 70 min to about 10 min; much better, but still too long.

In comparison, the answer by @jqurious takes about 5 min. It needs an additional step at the end to remove the unwanted columns and get a dict from the list, but it's still much faster.

However, the winner is by far the answer by @Dean MacGregor, taking only 50 seconds and directly producing the desired output.

Here is their solution re-written as a function:

def dataframe_per_table_v3(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    table_dfs = {
            table_id: df.filter(
                pl.col(col_name__table_id) == table_id
            ).with_columns(
                idx_data=pl.int_ranges(pl.col(col_name__values).list.len())
            ).explode(
                [col_name__values, 'idx_data']
            ).pivot(
                on=col_name__col_id, 
                values=col_name__values, 
                index=[*col_name__other_ids, 'idx_data'], 
                aggregate_function='first'
            ).drop(
                'idx_data'
            ) for table_id in df.get_column(col_name__table_id).unique()
    }
    return table_dfs

Upvotes: 2

Views: 538

Answers (3)

Dean MacGregor
Dean MacGregor

Reputation: 18691

Let's break it down in a single tab so we'll just look at:

df.filter(pl.col('table_id')=='tab_1')

shape: (3, 4)
┌───────┬──────────┬───────────┬──────────────┐
│ db_id ┆ table_id ┆ column_id ┆ data         │
│ ---   ┆ ---      ┆ ---       ┆ ---          │
│ str   ┆ str      ┆ str       ┆ list[i64]    │
╞═══════╪══════════╪═══════════╪══════════════╡
│ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    │
│ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] │
│ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     │
└───────┴──────────┴───────────┴──────────────┘

We want the output to use the order of the elements in the data list combined with the db_id to be the row grouping.

We need to explicitly create that aforementioned index which we can do with int_ranges

df.filter(pl.col('table_id')=='tab_1') \
    .with_columns(datai=pl.int_ranges(pl.col('data').list.len()))

shape: (3, 5)
┌───────┬──────────┬───────────┬──────────────┬───────────┐
│ db_id ┆ table_id ┆ column_id ┆ data         ┆ datai     │
│ ---   ┆ ---      ┆ ---       ┆ ---          ┆ ---       │
│ str   ┆ str      ┆ str       ┆ list[i64]    ┆ list[i64] │
╞═══════╪══════════╪═══════════╪══════════════╪═══════════╡
│ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    ┆ [0, 1, 2] │
│ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] ┆ [0, 1, 2] │
│ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     ┆ [0, 1]    │
└───────┴──────────┴───────────┴──────────────┴───────────┘

Now we just explode/pivot to get

 df \
    .filter(pl.col('table_id')=='tab_1') \
    .with_columns(datai=pl.int_ranges(pl.col('data').list.len())) \
    .explode('data','datai') \
    .pivot(on='column_id', index=['db_id', 'datai'], values='data') \
    .drop('datai')


shape: (5, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_2 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 1     ┆ 10    │
│ db_1  ┆ 2     ┆ 20    │
│ db_1  ┆ 3     ┆ 30    │
│ db_2  ┆ null  ┆ 40    │
│ db_2  ┆ null  ┆ 50    │
└───────┴───────┴───────┘

Lastly, we just wrap the above in a dictionary compression replacing the hardcoded 'tab_1' with our iterator.

{tab:df \
    .filter(pl.col('table_id')==tab) \
    .with_columns(datai=pl.int_ranges(pl.col('data').list.len())) \
    .explode(['data','datai']) \
    .pivot(on='column_id', index=['db_id','datai'], values='data') \
    .drop('datai') for tab in df.get_column('table_id').unique()}


{'tab_1': shape: (5, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_2 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 1     ┆ 10    │
│ db_1  ┆ 2     ┆ 20    │
│ db_1  ┆ 3     ┆ 30    │
│ db_2  ┆ null  ┆ 40    │
│ db_2  ┆ null  ┆ 50    │
└───────┴───────┴───────┘,
'tab_2': shape: (3, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_3 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 4     ┆ null  │
│ db_1  ┆ 5     ┆ null  │
│ db_2  ┆ 6     ┆ 60    │
└───────┴───────┴───────┘}

Upvotes: 2

Wayoshi
Wayoshi

Reputation: 2903

To get a dict quickly enough, partition_by works along with the pivot you've done already:

{
    p: pdf.pivot(
        on="column_id", values="data", index="db_id", aggregate_function=None
    )
    for p, pdf in df.partition_by("table_id", as_dict=True).items()
}
{'tab_1': shape: (2, 3)
┌───────┬───────────┬──────────────┐
│ db_id ┆ col_1     ┆ col_2        │
│ ---   ┆ ---       ┆ ---          │
│ str   ┆ list[i64] ┆ list[i64]    │
╞═══════╪═══════════╪══════════════╡
│ db_1  ┆ [1, 2, 3] ┆ [10, 20, 30] │
│ db_2  ┆ null      ┆ [40, 50]     │
└───────┴───────────┴──────────────┘, 'tab_2': shape: (2, 3)
┌───────┬───────────┬───────────┐
│ db_id ┆ col_1     ┆ col_3     │
│ ---   ┆ ---       ┆ ---       │
│ str   ┆ list[i64] ┆ list[i64] │
╞═══════╪═══════════╪═══════════╡
│ db_1  ┆ [4, 5]    ┆ null      │
│ db_2  ┆ [6]       ┆ [60]      │
└───────┴───────────┴───────────┘}

I can't figure out the multi-column exploding in this case. Each null needs to be transformed into a list of nulls of equal length as the other columns, row by row (which also brings up the general case of non-equal length lists in a row, which would need to be padded with nulls).

Maybe some sort of big join? But I doubt that's any faster than your diagonal concat.

Upvotes: 1

jqurious
jqurious

Reputation: 21580

It may perform better if you keep everything in a single frame and pivot once.

This would mean you'd need to filter out the "null" columns afterwards.

df_long = (
    df.with_row_index()
    .explode("data")
    .with_columns(pl.len().over("table_id", "column_id"))
    .with_columns(max_len=pl.col("len").max().over("table_id"))
)

(
    df_long
    .with_columns(pl.col("index") + 1)
    .join_asof(df_long, by="table_id", on="index")
    .with_columns(diff=pl.col("max_len") - pl.col("len_right"))
    .with_columns(
        pl.when(pl.col("len") != pl.col("max_len")).then(pl.col("diff")).fill_null(0)
    )
    .with_columns(
        pl.col("index").cum_count().over("table_id", "column_id") + pl.col("diff")
    )
    .pivot(
        on="column_id",
        index=["index", "db_id", "table_id"],
        values="data",
        aggregate_function=None,
    )
    .group_by("index", "table_id", maintain_order=True)
    .agg(pl.all().drop_nulls().first())
    .partition_by("table_id")
)

[shape: (5, 6)
┌───────┬──────────┬───────┬───────┬───────┬───────┐
│ index ┆ table_id ┆ db_id ┆ col_1 ┆ col_2 ┆ col_3 │
│ ---   ┆ ---      ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ str      ┆ str   ┆ i64   ┆ i64   ┆ i64   │
╞═══════╪══════════╪═══════╪═══════╪═══════╪═══════╡
│ 1     ┆ tab_1    ┆ db_1  ┆ 1     ┆ 10    ┆ null  │
│ 2     ┆ tab_1    ┆ db_1  ┆ 2     ┆ 20    ┆ null  │
│ 3     ┆ tab_1    ┆ db_1  ┆ 3     ┆ 30    ┆ null  │
│ 4     ┆ tab_1    ┆ db_2  ┆ null  ┆ 40    ┆ null  │
│ 5     ┆ tab_1    ┆ db_2  ┆ null  ┆ 50    ┆ null  │
└───────┴──────────┴───────┴───────┴───────┴───────┘, shape: (3, 6)
┌───────┬──────────┬───────┬───────┬───────┬───────┐
│ index ┆ table_id ┆ db_id ┆ col_1 ┆ col_2 ┆ col_3 │
│ ---   ┆ ---      ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ str      ┆ str   ┆ i64   ┆ i64   ┆ i64   │
╞═══════╪══════════╪═══════╪═══════╪═══════╪═══════╡
│ 1     ┆ tab_2    ┆ db_1  ┆ 4     ┆ null  ┆ null  │
│ 2     ┆ tab_2    ┆ db_1  ┆ 5     ┆ null  ┆ null  │
│ 3     ┆ tab_2    ┆ db_2  ┆ 6     ┆ null  ┆ 60    │
└───────┴──────────┴───────┴───────┴───────┴───────┘]

Upvotes: 1

Related Questions