Shibani
Shibani

Reputation: 148

How to get a Tuple for the grouped by result on a Spark Dataframe?

I am trying to group entities based on the id, running the following code I have this dataframe:

val pet_type_count = pet_list.groupBy("id","pets_type").count()
pet_type_count.sort("id").limit(20).show
+----------+---------------------+-----+
|        id|            pets_type|count|
+----------+---------------------+-----+
|         0|                    0|    2|
|         1|                    0|    3|
|         1|                    3|    3|
|        10|                    0|    4|
|        10|                    1|    1|
|        13|                    0|    3|
|        16|                    1|    3|
|        17|                    1|    1|
|        18|                    1|    2|
|        18|                    0|    1|
|        19|                    1|    7|
+----------+---------------------+-----+

I want to group the results of the group by on id to now return a list of tuples per id so I can apply the following udf per id:

val agg_udf =  udf { (v1: List[Tuple2[String, String]]) =>
    var feature_vector = Array.fill(5)(0)
    for (row <- v1) {
      val index = (5 - row._1.toInt)
      vector(index) = row._2.toInt
    }
    vector
}

val pet_vector_included = pet_type_count.groupBy("id").agg(agg_udf(col("pets_type_count")).alias("pet_count_vector"))

For which I need to get the following:

+----------+---------------------+-----+
|        id|            pets_type_count|
+----------+---------------------+-----+
|         0|                      (0,2)|
|         1|                      (0,3)|
|          |                      (3,3)|
|        10|                      (0,4)|
|          |                      (1,1)|
|        13|                      (0,3)|
|        16|                      (1,3)|
|        17|                      (1,1)|
|        18|                      (1,2)|
|          |                      (0,1)|
|        19|                      (1,7)|
+----------+---------------------+-----+

I am unable to figure out the how to get tuples after the groupby on id. Any help would be appreciated!

Upvotes: 2

Views: 5353

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can simply use struct inbuilt function to make pets_type and count columns as one column and use collect_list inbuilt function to collect the newly formed column when grouped by id. And you can orderBy just to order the dataframe by id column.

import org.apache.spark.sql.functions._
val pet_type_count = df.withColumn("struct", struct("pets_type", "count"))
  .groupBy("id").agg(collect_list(col("struct")).as("pets_type_count"))
  .orderBy("id")

this should give you your desired result as

+---+---------------+
|id |pets_type_count|
+---+---------------+
|0  |[[0,2]]        |
|1  |[[0,3], [3,3]] |
|10 |[[0,4], [1,1]] |
|13 |[[0,3]]        |
|16 |[[1,3]]        |
|17 |[[1,1]]        |
|18 |[[1,2], [0,1]] |
|19 |[[1,7]]        |
+---+---------------+

So you can apply the udf function that you have defined (which needs some modifications too) as below

val agg_udf =  udf { (v1: Seq[Row]) =>
  var feature_vector = Array.fill(5)(0)
  for (row <- v1) {
    val index = (4 - row.getAs[Int](0))
    feature_vector(index) = row.getAs[Int](1)
  }
  feature_vector
}

val pet_vector_included = pet_type_count.withColumn("pet_count_vector", agg_udf(col("pets_type_count")))

pet_vector_included.show(false)

which should give you

+---+---------------+----------------+
|id |pets_type_count|pet_count_vector|
+---+---------------+----------------+
|0  |[[0,2]]        |[0, 0, 0, 0, 2] |
|1  |[[0,3], [3,3]] |[0, 3, 0, 0, 3] |
|10 |[[0,4], [1,1]] |[0, 0, 0, 1, 4] |
|13 |[[0,3]]        |[0, 0, 0, 0, 3] |
|16 |[[1,3]]        |[0, 0, 0, 3, 0] |
|17 |[[1,1]]        |[0, 0, 0, 1, 0] |
|18 |[[1,2], [0,1]] |[0, 0, 0, 2, 1] |
|19 |[[1,7]]        |[0, 0, 0, 7, 0] |
+---+---------------+----------------+

I hope the answer is helpful

Upvotes: 9

Related Questions