ande
ande

Reputation: 67

Pyspark grouping and structuring data

I have the following data in spark 2.4.5:

data = [
    ('1234', '203957', '2010', 'London', 'CHEM'),
    ('1234', '203957', '2010', 'London', 'BIOL'),
    ('1234', '288400', '2012', 'Berlin', 'MATH'),
    ('1234', '288400', '2012', 'Berlin', 'CHEM'),
]
d = spark.createDataFrame(data, ['auid', 'eid', 'year', 'city', 'subject'])
d.show()

+----+------+----+------+-------+
|auid|   eid|year|  city|subject|
+----+------+----+------+-------+
|1234|203957|2010|London|   CHEM|
|1234|203957|2010|London|   BIOL|
|1234|288400|2012|Berlin|   MATH|
|1234|288400|2012|Berlin|   CHEM|
+----+------+----+------+-------+

from which I need to get df grouped by auid, with a chronological order of cities, i.e. London, Berlin and [[Berlin, 2010], [London, 2012]] in another column, plus I need sorted by descending frequency column with subjects: [CHEM,2], [BIOL, 1], [MATH, 1]. Or it could be just like [CHEM, BIOL, MATH].

I tried this:

d.groupBy('auid').agg(func.collect_set(func.struct('city', 'year')).alias('city_set')).show(10, False)

which leads to this:

+----+--------------------------------+
|auid|city_set                        |
+----+--------------------------------+
|1234|[[Berlin, 2012], [London, 2010]]|
+----+--------------------------------+

Here I am stuck and need help. (would appreciate a hint on to sort values in city_set)

Upvotes: 2

Views: 409

Answers (1)

jxc
jxc

Reputation: 13998

You can do aggregate of collect_list on struct('year', 'city'), sort the array and then use transform function to adjust the order of the fields. Similar to subjects, create an array of structs with two fields: cnt and subject, sort/desc the array of structs and then retrieve only subject field:

df_new = d.groupBy('auid').agg(
      func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set'),
      func.collect_list('subject').alias('subjects')
    ).withColumn('city_set', func.expr("transform(city_set, x -> (x.city as city, x.year as year))")) \
    .withColumn('subjects', func.expr("""
        sort_array(
          transform(array_distinct(subjects), x -> (size(filter(subjects, y -> y=x)) as cnt, x as subject)),
          False
        ).subject
      """))

df_new.show(truncate=False) 
+----+--------------------------------+------------------+
|auid|city_set                        |subjects          |
+----+--------------------------------+------------------+
|1234|[[London, 2010], [Berlin, 2012]]|[CHEM, MATH, BIOL]|
+----+--------------------------------+------------------+

Edit: there are several ways to remove the duplicate city entries in city_set array:

  1. use Window function to adjust year to min(year) for each city, and then repeat the above procedure.

    d = d.withColumn('year', func.min('year').over(Window.partitionBy('auid','city')))
    
  2. use aggregate function to remove duplicates from city_set array:

    df_new = d.groupBy('auid').agg(
        func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set')     
    ).withColumn("city_set", func.expr("""         
        aggregate(        
          /* expr: take slice of city_set array from the 2nd element to the last */
          slice(city_set,2,size(city_set)-1),           
          /* start: initialize `acc` as an array with a single entry city_set[0].city */
          array(city_set[0].city),
          /* merge: iterate through `expr`, if x.city exists in `acc`, keep as-is
           *        , otherwise add an entry to `acc` using concat function */
          (acc,x) -> IF(array_contains(acc,x.city), acc, concat(acc, array(x.city)))                     
        )                              
    """))
    

Note: it would be much easier with Spark 3.0+ though:

df_new = d.groupBy('auid').agg(func.expr("array_sort(collect_set((city,year)), (l,r) -> int(l.year-r.year)) as city_set"))

Upvotes: 2

Related Questions