Reputation: 281
For every row in the below dataframe, I want to find column names (as an array or tuple or something else) according to descending column entries. So, for dataframe
+---+---+---+---+---+
| ID|key| a| b| c|
+---+---+---+---+---+
| 0| 1| 5| 2| 1|
| 1| 1| 3| 4| 5|
+---+---+---+---+---+
I want to find
+---+---+---+---+---+------------------+
| ID|key| a| b| c|descending_columns|
+---+---+---+---+---+------------------+
| 0| 1| 5| 2| 1| [a,b,c]|
| 1| 1| 3| 4| 5| [c,b,a]|
+---+---+---+---+---+------------------+
Ideally and in general, I want to be able to iterate through pre-specified columns and apply a function based on those column entries. This could look like:
import pyspark.sql.functions as f
name_cols = ["a","b","c"]
for col in name_cols:
values_ls.append = []
...schema specification....
values_ls.append(f.col(col) ...get column value... )
df1 = df.withColumn("descending_columns", values_ls)
The question is rather simple, but seems to be quite challenging to implement efficiently in pyspark.
I am using pyspark version 2.3.3.
Upvotes: 1
Views: 1425
Reputation: 43534
For Spark Versions < 2.4 you can achieve this without a udf
using sort_array
and struct
.
First get a list of the columns to sort
cols_to_sort = df.columns[2:]
print(cols_to_sort)
#['a', 'b', 'c']
Now build a struct with two elements - a "value"
and a "key"
. The "key"
is the column name and the "value"
is the column value. If you ensure that the "value"
comes first in the struct
, you can use sort_array
to sort this array of structs in the manner you want.
After the array is sorted, you just need to iterate over it and extract the "key"
part, which contains the column names.
from pyspark.sql.functions import array, col, lit, sort_array, struct
df.withColumn(
"descending_columns",
array(
*[
sort_array(
array(
*[
struct([col(c).alias("value"), lit(c).alias("key")])
for c in cols_to_sort
]
),
asc=False
)[i]["key"]
for i in range(len(cols_to_sort))
]
)
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a |b |c |descending_columns|
#+---+---+---+---+---+------------------+
#|0 |1 |5 |2 |1 |[a, b, c] |
#|1 |1 |3 |4 |5 |[c, b, a] |
#+---+---+---+---+---+------------------+
Even though this looks complicated, it should offer better performance than the udf
solution.
Update: To sort by the original column order in the case of a tie in the value, you could insert another value in the struct which contains the index. Since the sort is descending, we use the negative of the index.
For example, if your input dataframe were the following:
df.show()
#+---+---+---+---+---+
#| ID|key| a| b| c|
#+---+---+---+---+---+
#| 0| 1| 5| 2| 1|
#| 1| 1| 3| 4| 5|
#| 2| 1| 4| 4| 5|
#+---+---+---+---+---+
The last row above has a tie in value between a
and b
. We want a
to sort before b
in this case.
df.withColumn(
"descending_columns",
array(
*[
sort_array(
array(
*[
struct(
[
col(c).alias("value"),
lit(-j).alias("index"),
lit(c).alias("key")
]
)
for j, c in enumerate(cols_to_sort)
]
),
asc=False
)[i]["key"]
for i in range(len(cols_to_sort))
]
)
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a |b |c |descending_columns|
#+---+---+---+---+---+------------------+
#|0 |1 |5 |2 |1 |[a, b, c] |
#|1 |1 |3 |4 |5 |[c, b, a] |
#|2 |1 |4 |4 |5 |[c, a, b] |
#+---+---+---+---+---+------------------+
Upvotes: 1
Reputation: 144
You could insert the columns into a single struct and process that in a udf.
from pyspark.sql import functions as F
from pyspark.sql import types as T
name_cols = ['a', 'b', 'c']
def ordered_columns(row):
return [x for _,x in sorted(zip(row.asDict().values(), name_cols), reverse=True)]
udf_ordered_columns = F.udf(ordered_columns, T.ArrayType(T.StringType()))
df1 = (
df
.withColumn(
'row',
F.struct(*name_cols)
)
.withColumn(
'descending_columns',
udf_ordered_columns('row')
)
)
Something like this should work, if above doesn't, then let me know.
Upvotes: 1