Mario
Mario

Reputation: 1966

What is the best PySpark practice to filter spark dataframe for voting system on certain columns?

I'm struggling to create an efficient voting system on spark dataframe to apply on certain columns for selecting desired records/rows . Let's say my data looks as below:

+----------+-----------+-------------+----------+----------+
|   Names  |     A     |      B      |     C    |    D     |
+----------+-----------+-------------+----------+----------+
|        X1|       true|         true|      true|      true|
|        X5|      false|         true|      true|     false|
|        X2|      false|        false|     false|     false|
|        X3|       true|         true|      true|     false|
|        X4|      false|        false|     false|      true|
|        X5|      false|         true|      true|     false|
|        X3|       true|         true|      true|     false|
+----------+-----------+-------------+----------+----------+

I want to create a new column name majority, which counts number of true for each rows and if it is:

def get_mode(df):
    counts = df.groupBy(['Names', 'A', 'B', 'C', 'D']).count().alias('count')
    #counts.show()
    win = Window().partitionBy('A', 'B', 'C', 'D').orderBy(F.col('count').desc())
    result = (counts
              .withColumn('majority', F.rank().over(win))
              #.where(F.col('majority) == 1)
              #.select('x', 'y')
             )
    #result.show()
        
    return result
df = get_mode(df)

def voting_sys(df):
    partition_columns = df.columns[1:]
    vote_results = (df  
    .withColumn('majority', F
        .over(Window.partitionBy(partition_columns))
        .when(F.isnull('majority'), '-')
        .when(F.regexp_extract('majority', '(?i)^true', 0) >  F.regexp_extract('majority', '(?i)^false', 0), 'abnormal')   
        .when(F.regexp_extract('majority', '(?i)^true', 0) == F.regexp_extract('majority', '(?i)^false', 0), '50-50')       
        .when(F.regexp_extract('majority', '(?i)^true', 0) <  F.regexp_extract('majority', '(?i)^false', 0), 'normal') 
        #.otherwise('normal') 
                                                                                  
    #.show()
)
        
    return vote_results

Note: I'm not interested to hack it using df.toPandas().

Upvotes: 1

Views: 481

Answers (2)

pltc
pltc

Reputation: 6082

I love the idea of @LiamFiddler casting Boolean to Integer. However, I'm not recommending using UDF here, it's unnecessary.

First, you can convert from string to Boolean, then from Boolean to Integer, like so F.col(c).cast('boolean').cast('int'), this transformation I believe is not as expensive as it might sound like.

Second, you don't have to hardcode the columns here (A, B, C, D), you can do the sum sum(F.col(c) for c in cols]

This is my working code

cols = df.columns[1:]
# ['A', 'B', 'C', 'D']

(df
    .withColumn('sum', sum([F.col(c).cast('boolean').cast('int') for c in cols]))
    .withColumn('majority', F
        .when(F.col('sum')  > len(cols) / 2, 'abnormal')
        .when(F.col('sum') == len(cols) / 2, '50-50')
        .when(F.col('sum')  < len(cols) / 2, 'normal')
    )
    
    # order by abnormal, 50-50, normal
    .orderBy(F
        .when(F.col('majority') == 'abnormal', 1)
        .when(F.col('majority') == '50-50', 2)
        .when(F.col('majority') == 'normal', 3)
    )
    .show()
)

# Output
# +-----+-----+-----+-----+-----+---+--------+
# |Names|    A|    B|    C|    D|sum|majority|
# +-----+-----+-----+-----+-----+---+--------+
# |   X3| true| true| true|false|  3|abnormal|
# |   X3| true| true| true|false|  3|abnormal|
# |   X1| true| true| true| true|  4|abnormal|
# |   X5|false| true| true|false|  2|   50-50|
# |   X5|false| true| true|false|  2|   50-50|
# |   X4|false|false|false| true|  1|  normal|
# |   X2|false|false|false|false|  0|  normal|
# +-----+-----+-----+-----+-----+---+--------+

Upvotes: 2

whege
whege

Reputation: 1441

My recommendation would be casting the Booleans as Integers, then taking an average of the four columns, which gives you a number between 0 and 1.

# if the values are stored as strings, you'll need a UDF to convert bool:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

def true_false(s):
    return int(s.lower() == 'true')

tf_udf = udf(lambda x: true_false(x), IntegerType())

for c in ['A', 'B', 'C', 'D']:
    df.withColumn(c, tf_udf(col(c)))

df.withColumn('majority', (df.A + df.B + df.C + df.D) / 4)

Upvotes: 0

Related Questions