Reputation: 533
I have the following RDD:
| Key | Value | Date |
|-----|-------|------------|
| 1 | A | 10/30/2016 |
| 1 | B | 10/31/2016 |
| 1 | C | 11/1/2016 |
| 1 | D | 11/2/2016 |
| 2 | A | 11/2/2016 |
| 2 | B | 11/2/2016 |
| 2 | C | 11/2/2016 |
| 3 | A | 10/30/2016 |
| 3 | B | 10/31/2016 |
| 3 | C | 11/1/2016 |
| 3 | D | 11/2/2016 |
And I would like to transform it to the following RDD:
| Key | List |
|-----|--------------|
| 1 | (A, B, C, D) |
| 2 | (A, B, C) |
| 3 | (A, B, C, D) |
Which is Key, List(Value) -- where the list of Values is ordered by the corresponding dates. All keys, obviously, will be unique, but not all values will necessarily be unique. I would still like to list all Values. How can I accomplish this?
Upvotes: 0
Views: 180
Reputation: 14825
Create a model to represent the data (you can use tuples as well, but coding with tuples would become ugly very soon. Its always good to have names for the fields)
case class DataItem(key: Int, value: String, timeInMillis: Long)
then
parse the data (you can use joda DateTimeFormat for parsing DateTime) and then create your rdd
val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))
and then final step groupBy
key and sortBy time
rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}
Scala REPL
scala> case class DataItem(key: Int, value: String, timeInMillis: Long)
defined class DataItem
scala> sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))
res10: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[12] at parallelize at <console>:36
scala> val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345)))
rdd: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[13] at parallelize at <console>:35
scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}
res11: org.apache.spark.rdd.RDD[(Int, List[DataItem])] = MapPartitionsRDD[16] at map at <console>:38
scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.foreach(println)
(1,List(DataItem(1,A,123)))
(2,List(DataItem(2,B,1234), DataItem(2,C,12345)))
scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.map { case (k, v) => (k, v.map(_.value)) }.foreach(println)
(1,List(A))
(2,List(B, C))
Upvotes: 1