Sree51
Sree51

Reputation: 99

RDD Key-Value pair with composite value

I have here a toy data set for which I need to compute list of cities in each state and population of that state(sum of population of all the cities in that state)Data

I want to do it using RDDs without using groupByKey and joins. My approach so far:

In this approach I used 2 separate key-value pairs and joined them.

val rdd1=inputRdd.map(x=>(x._1,x._3.toInt))
val rdd2=inputRdd.map(x=>(x._1,x._2))
val popn_sum=rdd1.reduceByKey(_+_)
val list_cities=rdd2.reduceByKey(_++_)
popn_sum.join(list_cities).collect()

Is it possible to get the same output with just 1 key-value pair and without any joins. I have created a new key-value pair, but I do not know how to proceed to get the same output using aggregateByKey or reduceByKey with this RDD:

val rdd3=inputRdd.map(x=>(x._1,(List(x._2),x._3))) 

I am new to spark and want to learn the best way get this output.

Array((B,(12,List(B1, B2))), (A,(6,List(A1, A2, A3))), (C,(8,List(C1, C2))))

Thanks in advance

Upvotes: 1

Views: 666

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If your inputRdd is of type

inputRdd: org.apache.spark.rdd.RDD[(String, String, Int)]

Then you can achieve your desired result by simply using one reduceByKey as

inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).reduceByKey((x, y) => (x._1 ++ y._1, x._2+y._2))

and you can it with aggregateByKey as

inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).aggregateByKey((List.empty[String], 0))((x, y) => (x._1 ++ y._1, x._2+y._2), (x, y) => (x._1 ++ y._1, x._2+y._2))

DataFrame way

Even better approach would be to use dataframe way. You can convert your rdd to dataframe simply by applying .toDF("state", "city", "population") which should give you

+-----+----+----------+
|state|city|population|
+-----+----+----------+
|A    |A1  |1         |
|B    |B1  |2         |
|C    |C1  |3         |
|A    |A2  |2         |
|A    |A3  |3         |
|B    |B2  |10        |
|C    |C2  |5         |
+-----+----+----------+

After that you can just use groupBy, and apply collect_list and sum inbuilt aggregation functions as

import org.apache.spark.sql.functions._
inputDf.groupBy("state").agg(collect_list(col("city")).as("cityList"), sum("population").as("sumPopulation"))

which should give you

+-----+------------+-------------+
|state|cityList    |sumPopulation|
+-----+------------+-------------+
|B    |[B1, B2]    |12           |
|C    |[C1, C2]    |8            |
|A    |[A1, A2, A3]|6            |
+-----+------------+-------------+

Dataset is almost the same but comes with additional type-safety

Upvotes: 1

Related Questions