Reputation: 41
I'm having some trouble counting the number of occurrences of a key, while also keeping several values.
Usually I will just do:
val a = file1.map(x => (x, 1)).reduceByKey(_ + _)
which gives the number of occurrences for each key.
However, I also want to keep the values for each occurrence of a key, at the same time as counting the number of occurrences of the key. Something like this:
val a = file1.map(x => (x(1), (x(2), 1)).reduceByKey{case (x,y) => (x._1, y._1, x._2+y._2)}
For example: if the key x(1)
is a country and x(2)
is a city, I want to keep all the cities in a country, as well as knowing how many cities there are in a country.
Upvotes: 0
Views: 1244
Reputation: 41957
I would suggest you to go with dataframes
as well as dataframes
are optimized and easy to use than rdds
.
But if you want to learn about reduceByKey
functionality (i.e. keeping other information as you said city information) then you can do something like below
Lets say you have a rdd
as
val rdd = sc.parallelize(Seq(
("country1", "city1"),
("country1", "city2"),
("country1", "city3"),
("country1", "city3"),
("country2", "city1"),
("country2", "city2")
))
Your tried reducyByKey
would need some modification as
rdd.map(x => (x._1, (Set(x._2), 1))) //I have used Set to get distinct cities (you can use list or arrays or any other collection
.reduceByKey((x,y)=> (x._1 ++ y._1, x._2 + y._2)) //cities are also summed and counts are also summed
which should give you
(country2,(Set(city1, city2),2))
(country1,(Set(city1, city2, city3),4))
I hope the answer is helpful
If you want to learn reduceByKey in detail you can check my detailed answer
Upvotes: 0
Reputation: 28322
In this case, I would recommend using a dataframe instead of a RDD, and use the groupBy
and agg
methods.
You can easily convert an RDD to a dataframe using the toDF
function, just make sure you import implicits first. Example assuming the RDD has two columns:
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = rdd.toDF("country", "city")
Then use the groupBy
and aggregate the values you want.
df.groupBy("country").agg(collect_set($"city").as("cities"), count($"city").as("count"))
Upvotes: 0
Reputation: 5213
It's complicated and redundant to keep the count of the cities together with its list. You can just collect all the cities, and add the size at the end:
It is of course easier if you use the dataframe interface (assuming a dataframe (key:Int, city:String)
)
import org.apache.spark.sql.{ functions => f}
import spark.implicits._
df.groupBy($"key").
agg(f.collect_set($"city").as("cities")).
withColumn("ncities", f.size($"cities"))
but you can do something similar with raw rdd (I am assuming in input tuples of (id,city)
)
rdd.map{ x => (x(0),Set(x(1)))}.
reduceByKey{ case(x,y) => x ++ y }.
map { case(x,y:Set[_]) => (x,y, y.size)}
Upvotes: 1