acs254
acs254

Reputation: 533

Reduce Key with Ordered List of Values

I have the following RDD:

| Key | Value | Date       |
|-----|-------|------------|
| 1   | A     | 10/30/2016 |
| 1   | B     | 10/31/2016 |
| 1   | C     | 11/1/2016  |
| 1   | D     | 11/2/2016  |
| 2   | A     | 11/2/2016  |
| 2   | B     | 11/2/2016  |
| 2   | C     | 11/2/2016  |
| 3   | A     | 10/30/2016 |
| 3   | B     | 10/31/2016 |
| 3   | C     | 11/1/2016  |
| 3   | D     | 11/2/2016  |

And I would like to transform it to the following RDD:

| Key | List         |
|-----|--------------|
| 1   | (A, B, C, D) |
| 2   | (A, B, C)    |
| 3   | (A, B, C, D) |

Which is Key, List(Value) -- where the list of Values is ordered by the corresponding dates. All keys, obviously, will be unique, but not all values will necessarily be unique. I would still like to list all Values. How can I accomplish this?

Upvotes: 0

Views: 180

Answers (1)

Nagarjuna Pamu
Nagarjuna Pamu

Reputation: 14825

Create a model to represent the data (you can use tuples as well, but coding with tuples would become ugly very soon. Its always good to have names for the fields)

case class DataItem(key: Int, value: String, timeInMillis: Long)

then

parse the data (you can use joda DateTimeFormat for parsing DateTime) and then create your rdd

val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))

and then final step groupBy key and sortBy time

rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}

Scala REPL

scala> case class DataItem(key: Int, value: String, timeInMillis: Long)
defined class DataItem

scala> sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))
res10: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[12] at parallelize at <console>:36

scala> val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))
rdd: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[13] at parallelize at <console>:35

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}
res11: org.apache.spark.rdd.RDD[(Int, List[DataItem])] = MapPartitionsRDD[16] at map at <console>:38

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.foreach(println)
(1,List(DataItem(1,A,123)))
(2,List(DataItem(2,B,1234), DataItem(2,C,12345)))

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.map { case (k, v) => (k, v.map(_.value)) }.foreach(println)
(1,List(A))
(2,List(B, C))

Upvotes: 1

Related Questions