CoMacNo
CoMacNo

Reputation: 41

Counting occurrences of key while keeping several values

I'm having some trouble counting the number of occurrences of a key, while also keeping several values.

Usually I will just do:

val a = file1.map(x => (x, 1)).reduceByKey(_ + _)

which gives the number of occurrences for each key.

However, I also want to keep the values for each occurrence of a key, at the same time as counting the number of occurrences of the key. Something like this:

val a = file1.map(x => (x(1), (x(2), 1)).reduceByKey{case (x,y) => (x._1, y._1, x._2+y._2)}

For example: if the key x(1) is a country and x(2) is a city, I want to keep all the cities in a country, as well as knowing how many cities there are in a country.

Upvotes: 0

Views: 1244

Answers (3)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I would suggest you to go with dataframes as well as dataframes are optimized and easy to use than rdds.

But if you want to learn about reduceByKey functionality (i.e. keeping other information as you said city information) then you can do something like below

Lets say you have a rdd as

val rdd = sc.parallelize(Seq(
  ("country1", "city1"),
  ("country1", "city2"),
  ("country1", "city3"),
  ("country1", "city3"),
  ("country2", "city1"),
  ("country2", "city2")
))

Your tried reducyByKey would need some modification as

rdd.map(x => (x._1, (Set(x._2), 1)))  //I have used Set to get distinct cities (you can use list or arrays or any other collection
  .reduceByKey((x,y)=> (x._1 ++ y._1, x._2 + y._2))  //cities are also summed and counts are also summed

which should give you

(country2,(Set(city1, city2),2))
(country1,(Set(city1, city2, city3),4))

I hope the answer is helpful

If you want to learn reduceByKey in detail you can check my detailed answer

Upvotes: 0

Shaido
Shaido

Reputation: 28322

In this case, I would recommend using a dataframe instead of a RDD, and use the groupBy and agg methods.

You can easily convert an RDD to a dataframe using the toDF function, just make sure you import implicits first. Example assuming the RDD has two columns:

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

val df = rdd.toDF("country", "city")

Then use the groupBy and aggregate the values you want.

df.groupBy("country").agg(collect_set($"city").as("cities"), count($"city").as("count"))

Upvotes: 0

Roberto Congiu
Roberto Congiu

Reputation: 5213

It's complicated and redundant to keep the count of the cities together with its list. You can just collect all the cities, and add the size at the end:

It is of course easier if you use the dataframe interface (assuming a dataframe (key:Int, city:String))

import org.apache.spark.sql.{ functions => f}
import spark.implicits._
df.groupBy($"key").
   agg(f.collect_set($"city").as("cities")).
   withColumn("ncities", f.size($"cities"))

but you can do something similar with raw rdd (I am assuming in input tuples of (id,city) )

rdd.map{ x => (x(0),Set(x(1)))}.
   reduceByKey{ case(x,y) => x ++ y }.
   map { case(x,y:Set[_]) => (x,y, y.size)}

Upvotes: 1

Related Questions