Joe
Joe

Reputation: 47609

Sorting iterable values in Spark

Let's say I have this input data:

["example.com", Date(2000, 1, 1)] : 100,
["example.com", Date(2000, 2, 1)]: 30,
["example.com", Date(2000, 3, 1)]: 5, 
["xyz.com", Date(2000, 1, 1)]: 20,
["xyz.com", Date(2000, 2, 1)]: 10,
["xyz.com", Date(2000, 3, 1)]: 60]

I want to group by the date (descending) and then sort by the count, giving me an ordered list of domains per date.

I want to end up with:

Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
Date(2000, 3, 1), [["xyz.com", 60], ["example.com", 5]]

It seems like a normal use case but I can't see a way of doing this from the programming guide.

I can map [[domain, date] count] -> [date, [domain, count]]

which would give me (K, V) pairs

Date(2000, 1, 1), ["example.com", 100],
Date(2000, 2, 1), ["example.com", 30],
Date(2000, 3, 1), ["example.com", 5], 
Date(2000, 1, 1), ["xyz.com", 20],
Date(2000, 2, 1), ["xyz.com", 10],
Date(2000, 3, 1), ["xyz.com", 60]

then groupByKey, giving me (K, Iterable<V>) pairs

[Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
[Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
[Date(2000, 3, 1), [["example.com", 5], ["xyz.com", 60]]

How can I then sort within the keys?

Please excuse the pseudocode, I'm using the Flambo Clojure wrapper and I don't want to rewrite it in Java just to ask this question!

EDIT: Each Iterable (i.e. the list of domains) is probably going to be too large to fit in memory.

EDIT2: This is all psuedocode. I used month names to make this readable, but I've changed that to real dates for clarity.

Upvotes: 5

Views: 5392

Answers (1)

Sean Owen
Sean Owen

Reputation: 66886

In broad strokes, I would do the following. (May not be 100% correct since I didn't compile it, but close.) I will assume for simplicity you start with an RDD[((String,String),Int)].

First, groupBy the month with something like:

.groupBy { case ((_, month), _) => month }

and get rid of month in the values:

.mapValues(_.map { case ((domain, _), count) => (domain, count) })

If ordering by month is desired, define the ordering of months:

def monthOfYear(month: String): Int = 
  month match {
     case "January" => 1
     case "February" => 2
     ...
  }

and sort the RDD by month:

.sortBy { case (month, _) => monthOfYear(month) }

and sort the domains by count descending:

.mapValues(_.toSeq.sortBy{ case (domain, count) => count }(Ordering[Int].reverse))

That's direct and efficient but has the problem that all domain-count pairs for a month must fit in memory.

Instead you could start over by sorting by count descending:

.sortBy(p => p._2, false)

and then group by month. I haven't tested this, and I don't think the behavior is guaranteed, but I expect that in practice the elements will be encountered in order by count even after grouping.

Upvotes: 6

Related Questions