Nandita Dwivedi
Nandita Dwivedi

Reputation: 83

Spark use reduceByKey on a nested Key-Value structure

My data looks like:

Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2
Customer2| item1:x1,x7,x3; item3:x5|time3|url3

I want to ReduceByKey same customerIds and mapValues to get union of distinct items for each customerId:

Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6; item1:x1,x7,x3; item3:x5

This I am able to achieve by:

val line = spark.sparkContext.textFile(args(0))
val record = line.map(l=>l.split("\|")).map(l=>(l(0),l(1))).reduceByKey((x,y) => x.union(y)).mapValues(x=>x.distinct)

Now, I want each item in the second column to be unique as well and all values in same key should be joined using union and distinct, to get something like:

Customer1| item1:x1,x2,x3,x6,x7; item2:x1,x4,x5; item3:x5

once this is done I want to pick all frequency of each x, eg: x1:2, x2:1 .... and updated a vector of x(1-10) for customerId with the frequencies I get.

Can this be achieved in spark?

Upvotes: 0

Views: 1376

Answers (1)

Ben Horsburgh
Ben Horsburgh

Reputation: 563

Yes, you can certainly do this in Spark! The way you have approached the problem has made it seem a bit more difficult that it actually is however.

So I can show a full copy-pastable to REPL example, let's assume your data is stored in a String (not args(0) file)

val data = """Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2
Customer2| item1:x1,x7,x3; item3:x5|time3|url3"""

and the RDD you call "line" can be read into RDD "rdd" as

val rdd = sc.parallelize(data.split("\n"))

So far nothing new. The next step is the important one. Instead of doing the counting and aggregating in layers, we can prepare our data to do it all in one go. This is much more readable, and also more efficient, since it is a single map followed by a single reduce.

val mapped= rdd.flatMap(line => {
   val arr = line.split("\\|")
   val customer = arr(0)
   val items = arr(1)
   val time = arr(2)
   val url = arr(3)

   items.split(";").flatMap(item => {
      val itemKey = item.split(":")(0)
      val itemValues = item.split(":")(1).split(",")

      itemValues.map(value => (customer, itemKey, value, time, url))
   })
})

We can see what's in this we can print it nicely with mapped.toDF("customer", "itemId", "itemValue", "time", "url").show

+---------+------+---------+-----+----+
| customer|itemId|itemValue| time| url|
+---------+------+---------+-----+----+
|Customer1| item1|       x1|time1| url|
|Customer1| item1|       x2|time1| url|
|Customer1| item1|       x3|time1| url|
|Customer1| item2|       x1|time1| url|
|Customer1| item2|       x4|time1| url|
|Customer1| item2|       x5|time1| url|
|Customer1| item1|       x1|time1| url|
|Customer1| item1|       x3|time1| url|
|Customer1| item1|       x6|time1| url|
|Customer1| item1|       x1|time2|url2|
|Customer1| item1|       x7|time2|url2|
|Customer1| item1|       x3|time2|url2|
|Customer1| item2|       x1|time2|url2|
|Customer1| item2|       x4|time2|url2|
|Customer1| item2|       x5|time2|url2|
|Customer1| item3|       x5|time2|url2|
|Customer2| item1|       x1|time3|url3|
|Customer2| item1|       x7|time3|url3|
|Customer2| item1|       x3|time3|url3|
|Customer2| item3|       x5|time3|url3|
+---------+------+---------+-----+----+

Finally, we can count and reduce into the vector you need:

val reduced = mapped.map{case (customer, itemKey, itemValue, time, url) => ((customer, itemKey, itemValue), 1)}.
   reduceByKey(_+_).
   map{case ((customer, itemKey, itemValue), count) => (customer, itemKey, itemValue, count)}

and view it: reduced.toDF("customer", "itemKey", "itemValue", "count").show

+---------+-------+---------+-----+                                             
| customer|itemKey|itemValue|count|
+---------+-------+---------+-----+
|Customer1|  item1|       x2|    1|
|Customer1|  item1|       x1|    3|
|Customer2|  item1|       x7|    1|
|Customer1|  item1|       x6|    1|
|Customer1|  item1|       x7|    1|
|Customer2|  item1|       x3|    1|
|Customer2|  item3|       x5|    1|
|Customer1|  item2|       x5|    2|
|Customer1|  item2|       x4|    2|
|Customer1|  item2|       x1|    2|
|Customer1|  item3|       x5|    1|
|Customer1|  item1|       x3|    3|
|Customer2|  item1|       x1|    1|
+---------+-------+---------+-----+

If you need everything grouped into an Array/Seq representation of a vector you can do this by further aggregating the data. Hope this helps!

Upvotes: 0

Related Questions