Reputation: 3111
First I had a salesList: List[Sale]
and in order to get an ID of the last Sale in the List I've used lastOption
:
val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)
But now I've modified a method with List[Sale]
to work with salesListRdd: List[RDD[Sale]]
. So I've changed the way I'm getting an ID of the last Sale:
val lastSaleId: Option[Any] = SparkContext
.union(salesListRdd)
.collect().toList
.lastOption.map(_.saleId)
I'm not sure that it is the best way to go. Because here I'm still collecting RDD to a List which brings it to the driver node and it may cause the driver to run out of memory.
Is there a way to get an ID of the last Sale from an RDD preserving the initial order of records? Not any kind of sorting but the way the Sale objects were originally stored in the List?
Upvotes: 1
Views: 2220
Reputation: 11
There at least two efficient solutions. You can either use top
with zipWithIndex
:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}
or top
with custom key:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.mapPartitionsWithIndex(
(i, iter) => iter.zipWithIndex.map { case (x, j) => ((i, j), x) }
).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
}
The former one requires additional action for zipWithIndex
while the latter one doesn't.
Before using please be sure to understand the limitation. Quoting the docs:
Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The unique ID assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
In particular, depending on the exact input, Union
might not preserve the input order at all.
Upvotes: 3
Reputation: 764
You could use zipWithIndex
and sort descending
by it, so that the last record will be on the top, then take(1):
salesListRdd
.zipWithIndex()
.map({ case (x, y) => (y, x) })
.sortByKey(ascending = false)
.map({ case (x, y) => y })
.take(1)
Solution is taken from here: http://www.swi.com/spark-rdd-getting-bottom-records/ However, it is highly inefficient, since It does lots of partition shuffling.
Upvotes: 1