VMEscoli
VMEscoli

Reputation: 1232

How to find distinct values for different groups on a dataframe in Pyspark and recode the dataframe

I have a big dataframe, the dataframe contain groups of people which are flag in the variable called "groups".

What I need to do for this dataframe now, is to presented in a more meaningful way.

For example in the following group 148, this is the table below:

df.select('gender','postcode','age','groups','bought').filter(df.groups==148).show()   

+------+--------+---+----------+----------+
|gender|postcode|age|    groups|bought    |
+------+--------+---+----------+----------+
|     0|    2189| 25|       148|car       |
|     0|    2192| 34|       148|house     |
|     1|    2193| 37|       148|car       |
|     1|    2194| 38|       148|house     |
|     1|    2196| 54|       148|laptop    |
|     1|    2197| 27|       148|laptop    |
|     0|    2198| 44|       148|laptop    |
+------+--------+---+----------+----------+

Gender has 0,1, so all these people in this group, will be changed to "people" if all 1, then female, if all 0 then male. the rule but not for this group.

Now postcodes, the lowest is 2189 and the highest is 2211, then each case will change to [2189 - 2198].

For age, the lowest is 18 and the highest is 62, so it will be [25-54]

for bought, I need to check which items have been bought, these are [car,house,laptop]

So, this group recoding will end up as:

+------+-------------+--------+----------+------------------+
|gender|     postcode|     age|    groups|        bought    |
+------+-------------+--------+----------+------------------+
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]|       148|[car,house,laptop]|
+------+-------------+--------+----------+------------------+

and that will be done for all groups in the dataframe.

Any ideas? Here I found something similar but is in scala Thank you in advance!

Upvotes: 1

Views: 356

Answers (1)

Prem
Prem

Reputation: 11955

Hope this helps!

import pyspark.sql.functions as f
from pyspark.sql.types import StringType

df = sc.parallelize([
    [0, 2189, 25, 148, 'car'],
    [0, 2192, 34, 148, 'house'],
    [1, 2193, 37, 148, 'car'],
    [1, 2194, 38, 148, 'house'],
    [1, 2196, 54, 148, 'laptop'],
    [1, 2197, 27, 148, 'laptop'],
    [0, 2198, 44, 148, 'laptop']
]).toDF(('gender', 'postcode', 'age', 'groups', 'bought'))
df.show()

df1 = df.groupBy("groups").agg(f.collect_set("bought")).withColumnRenamed("collect_set(bought)","bought")
df2 = df.groupBy("groups").agg(f.min("age"), f.max("age")). \
    withColumn("age", f.concat(f.col("min(age)"), f.lit("-"), f.col("max(age)"))).select("groups","age")
df3 = df.groupBy("groups").agg(f.min("postcode"), f.max("postcode")). \
    withColumn("postcode", f.concat(f.col("min(postcode)"), f.lit("-"), f.col("max(postcode)"))).select("groups","postcode")
def modify_values(l):
    if l == [0, 1]:
        return "person"
    else:
        if l == [0]:
            return "male"
        else:
            return "female"
modified_val = f.udf(modify_values, StringType())
df4 = df.groupBy("groups").agg(f.collect_set("gender")).withColumn("gender",modified_val("collect_set(gender)")).select("groups","gender")

merged_df = df1.join(df2, "groups").join(df3, "groups").join(df4, "groups")
merged_df.show()

Output is:

+------+--------------------+-----+---------+------+
|groups|              bought|  age| postcode|gender|
+------+--------------------+-----+---------+------+
|   148|[laptop, house, car]|25-54|2189-2198|person|
+------+--------------------+-----+---------+------+


Don't forget to let us know if it solved your problem

Upvotes: 2

Related Questions