samba
samba

Reputation: 3111

Scala - How to select the last element from an RDD?

First I had a salesList: List[Sale] and in order to get an ID of the last Sale in the List I've used lastOption:

val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)

But now I've modified a method with List[Sale] to work with salesListRdd: List[RDD[Sale]]. So I've changed the way I'm getting an ID of the last Sale:

  val lastSaleId: Option[Any] = SparkContext
    .union(salesListRdd)
    .collect().toList
    .lastOption.map(_.saleId)

I'm not sure that it is the best way to go. Because here I'm still collecting RDD to a List which brings it to the driver node and it may cause the driver to run out of memory.

Is there a way to get an ID of the last Sale from an RDD preserving the initial order of records? Not any kind of sorting but the way the Sale objects were originally stored in the List?

Upvotes: 1

Views: 2220

Answers (2)

user11014805
user11014805

Reputation: 11

There at least two efficient solutions. You can either use top with zipWithIndex:

def lastValue[T](rdd: RDD[T]): Option[T] = {
  rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}

or top with custom key:

 def lastValue[T](rdd: RDD[T]): Option[T] = {
   rdd.mapPartitionsWithIndex(
     (i, iter) => iter.zipWithIndex.map {  case (x, j) => ((i, j), x) }
   ).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
 }

The former one requires additional action for zipWithIndex while the latter one doesn't.

Before using please be sure to understand the limitation. Quoting the docs:

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The unique ID assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

In particular, depending on the exact input, Union might not preserve the input order at all.

Upvotes: 3

Alexey Novakov
Alexey Novakov

Reputation: 764

You could use zipWithIndex and sort descending by it, so that the last record will be on the top, then take(1):

salesListRdd
    .zipWithIndex()
    .map({ case (x, y) => (y, x) })
    .sortByKey(ascending = false)
    .map({ case (x, y) => y })
    .take(1)

Solution is taken from here: http://www.swi.com/spark-rdd-getting-bottom-records/ However, it is highly inefficient, since It does lots of partition shuffling.

Upvotes: 1

Related Questions