Reputation: 1966
I'm struggling to create an efficient voting system on spark dataframe to apply on certain columns for selecting desired records/rows . Let's say my data looks as below:
+----------+-----------+-------------+----------+----------+
| Names | A | B | C | D |
+----------+-----------+-------------+----------+----------+
| X1| true| true| true| true|
| X5| false| true| true| false|
| X2| false| false| false| false|
| X3| true| true| true| false|
| X4| false| false| false| true|
| X5| false| true| true| false|
| X3| true| true| true| false|
+----------+-----------+-------------+----------+----------+
I want to create a new column name majority
, which counts number of true
for each rows and if it is:
abnormal
true
, label it 50-50
normal
I try to inspire by this post in absence of DataFrame.mode
in spark dataframe and make def df_mode(df)
function as below using window partition as they they offered here & here:
I tried to use regex to read true
/false
an count them but it is stupid:def get_mode(df):
counts = df.groupBy(['Names', 'A', 'B', 'C', 'D']).count().alias('count')
#counts.show()
win = Window().partitionBy('A', 'B', 'C', 'D').orderBy(F.col('count').desc())
result = (counts
.withColumn('majority', F.rank().over(win))
#.where(F.col('majority) == 1)
#.select('x', 'y')
)
#result.show()
return result
df = get_mode(df)
def voting_sys(df):
partition_columns = df.columns[1:]
vote_results = (df
.withColumn('majority', F
.over(Window.partitionBy(partition_columns))
.when(F.isnull('majority'), '-')
.when(F.regexp_extract('majority', '(?i)^true', 0) > F.regexp_extract('majority', '(?i)^false', 0), 'abnormal')
.when(F.regexp_extract('majority', '(?i)^true', 0) == F.regexp_extract('majority', '(?i)^false', 0), '50-50')
.when(F.regexp_extract('majority', '(?i)^true', 0) < F.regexp_extract('majority', '(?i)^false', 0), 'normal')
#.otherwise('normal')
#.show()
)
return vote_results
Note: I'm not interested to hack it using df.toPandas()
.
Upvotes: 1
Views: 481
Reputation: 6082
I love the idea of @LiamFiddler casting Boolean to Integer. However, I'm not recommending using UDF here, it's unnecessary.
First, you can convert from string to Boolean, then from Boolean to Integer, like so F.col(c).cast('boolean').cast('int')
, this transformation I believe is not as expensive as it might sound like.
Second, you don't have to hardcode the columns here (A, B, C, D), you can do the sum sum(F.col(c) for c in cols]
This is my working code
cols = df.columns[1:]
# ['A', 'B', 'C', 'D']
(df
.withColumn('sum', sum([F.col(c).cast('boolean').cast('int') for c in cols]))
.withColumn('majority', F
.when(F.col('sum') > len(cols) / 2, 'abnormal')
.when(F.col('sum') == len(cols) / 2, '50-50')
.when(F.col('sum') < len(cols) / 2, 'normal')
)
# order by abnormal, 50-50, normal
.orderBy(F
.when(F.col('majority') == 'abnormal', 1)
.when(F.col('majority') == '50-50', 2)
.when(F.col('majority') == 'normal', 3)
)
.show()
)
# Output
# +-----+-----+-----+-----+-----+---+--------+
# |Names| A| B| C| D|sum|majority|
# +-----+-----+-----+-----+-----+---+--------+
# | X3| true| true| true|false| 3|abnormal|
# | X3| true| true| true|false| 3|abnormal|
# | X1| true| true| true| true| 4|abnormal|
# | X5|false| true| true|false| 2| 50-50|
# | X5|false| true| true|false| 2| 50-50|
# | X4|false|false|false| true| 1| normal|
# | X2|false|false|false|false| 0| normal|
# +-----+-----+-----+-----+-----+---+--------+
Upvotes: 2
Reputation: 1441
My recommendation would be casting the Booleans as Integers, then taking an average of the four columns, which gives you a number between 0 and 1.
# if the values are stored as strings, you'll need a UDF to convert bool:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
def true_false(s):
return int(s.lower() == 'true')
tf_udf = udf(lambda x: true_false(x), IntegerType())
for c in ['A', 'B', 'C', 'D']:
df.withColumn(c, tf_udf(col(c)))
df.withColumn('majority', (df.A + df.B + df.C + df.D) / 4)
Upvotes: 0