iJup
iJup

Reputation: 281

Spark DataFrame: get row wise sorted column names based on column values

For every row in the below dataframe, I want to find column names (as an array or tuple or something else) according to descending column entries. So, for dataframe

+---+---+---+---+---+
| ID|key|  a|  b|  c|
+---+---+---+---+---+
|  0|  1|  5|  2|  1|
|  1|  1|  3|  4|  5|
+---+---+---+---+---+

I want to find

+---+---+---+---+---+------------------+
| ID|key|  a|  b|  c|descending_columns|
+---+---+---+---+---+------------------+
|  0|  1|  5|  2|  1|           [a,b,c]|
|  1|  1|  3|  4|  5|           [c,b,a]|
+---+---+---+---+---+------------------+

Ideally and in general, I want to be able to iterate through pre-specified columns and apply a function based on those column entries. This could look like:

import pyspark.sql.functions as f

name_cols = ["a","b","c"]

for col in name_cols: 
    values_ls.append = []
    ...schema specification....
    values_ls.append(f.col(col) ...get column value... )

df1 = df.withColumn("descending_columns", values_ls)

The question is rather simple, but seems to be quite challenging to implement efficiently in pyspark.

I am using pyspark version 2.3.3.

Upvotes: 1

Views: 1425

Answers (2)

pault
pault

Reputation: 43534

For Spark Versions < 2.4 you can achieve this without a udf using sort_array and struct.

First get a list of the columns to sort

cols_to_sort = df.columns[2:]
print(cols_to_sort)
#['a', 'b', 'c']

Now build a struct with two elements - a "value" and a "key". The "key" is the column name and the "value" is the column value. If you ensure that the "value" comes first in the struct, you can use sort_array to sort this array of structs in the manner you want.

After the array is sorted, you just need to iterate over it and extract the "key" part, which contains the column names.

from pyspark.sql.functions import array, col, lit, sort_array, struct
df.withColumn(
    "descending_columns", 
    array(
        *[
            sort_array(
                array(
                    *[
                        struct([col(c).alias("value"), lit(c).alias("key")]) 
                        for c in cols_to_sort
                    ]
                ), 
                asc=False
            )[i]["key"]
            for i in range(len(cols_to_sort))
        ]
    )
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a  |b  |c  |descending_columns|
#+---+---+---+---+---+------------------+
#|0  |1  |5  |2  |1  |[a, b, c]         |
#|1  |1  |3  |4  |5  |[c, b, a]         |
#+---+---+---+---+---+------------------+

Even though this looks complicated, it should offer better performance than the udf solution.


Update: To sort by the original column order in the case of a tie in the value, you could insert another value in the struct which contains the index. Since the sort is descending, we use the negative of the index.

For example, if your input dataframe were the following:

df.show()
#+---+---+---+---+---+
#| ID|key|  a|  b|  c|
#+---+---+---+---+---+
#|  0|  1|  5|  2|  1|
#|  1|  1|  3|  4|  5|
#|  2|  1|  4|  4|  5|
#+---+---+---+---+---+

The last row above has a tie in value between a and b. We want a to sort before b in this case.

df.withColumn(
    "descending_columns", 
    array(
        *[
            sort_array(
                array(
                    *[
                        struct(
                            [
                                col(c).alias("value"), 
                                lit(-j).alias("index"), 
                                lit(c).alias("key")
                            ]
                        ) 
                        for j, c in enumerate(cols_to_sort)
                    ]
                ), 
                asc=False
            )[i]["key"]
            for i in range(len(cols_to_sort))
        ]
    )
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a  |b  |c  |descending_columns|
#+---+---+---+---+---+------------------+
#|0  |1  |5  |2  |1  |[a, b, c]         |
#|1  |1  |3  |4  |5  |[c, b, a]         |
#|2  |1  |4  |4  |5  |[c, a, b]         |
#+---+---+---+---+---+------------------+

Upvotes: 1

tpain
tpain

Reputation: 144

You could insert the columns into a single struct and process that in a udf.

from pyspark.sql import functions as F
from pyspark.sql import types as T

name_cols = ['a', 'b', 'c']

def ordered_columns(row):
    return [x for _,x in sorted(zip(row.asDict().values(), name_cols), reverse=True)]
udf_ordered_columns = F.udf(ordered_columns, T.ArrayType(T.StringType()))

df1 = (
    df
    .withColumn(
        'row',
        F.struct(*name_cols)
    )
    .withColumn(
        'descending_columns',
        udf_ordered_columns('row')
    )
)

Something like this should work, if above doesn't, then let me know.

Upvotes: 1

Related Questions