Reputation: 47609
Let's say I have this input data:
["example.com", Date(2000, 1, 1)] : 100,
["example.com", Date(2000, 2, 1)]: 30,
["example.com", Date(2000, 3, 1)]: 5,
["xyz.com", Date(2000, 1, 1)]: 20,
["xyz.com", Date(2000, 2, 1)]: 10,
["xyz.com", Date(2000, 3, 1)]: 60]
I want to group by the date (descending) and then sort by the count, giving me an ordered list of domains per date.
I want to end up with:
Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
Date(2000, 3, 1), [["xyz.com", 60], ["example.com", 5]]
It seems like a normal use case but I can't see a way of doing this from the programming guide.
I can map
[[domain, date] count] -> [date, [domain, count]]
which would give me (K, V)
pairs
Date(2000, 1, 1), ["example.com", 100],
Date(2000, 2, 1), ["example.com", 30],
Date(2000, 3, 1), ["example.com", 5],
Date(2000, 1, 1), ["xyz.com", 20],
Date(2000, 2, 1), ["xyz.com", 10],
Date(2000, 3, 1), ["xyz.com", 60]
then groupByKey
, giving me (K, Iterable<V>)
pairs
[Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
[Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
[Date(2000, 3, 1), [["example.com", 5], ["xyz.com", 60]]
How can I then sort within the keys?
Please excuse the pseudocode, I'm using the Flambo Clojure wrapper and I don't want to rewrite it in Java just to ask this question!
EDIT: Each Iterable (i.e. the list of domains) is probably going to be too large to fit in memory.
EDIT2: This is all psuedocode. I used month names to make this readable, but I've changed that to real dates for clarity.
Upvotes: 5
Views: 5392
Reputation: 66886
In broad strokes, I would do the following. (May not be 100% correct since I didn't compile it, but close.) I will assume for simplicity you start with an RDD[((String,String),Int)]
.
First, groupBy
the month with something like:
.groupBy { case ((_, month), _) => month }
and get rid of month in the values:
.mapValues(_.map { case ((domain, _), count) => (domain, count) })
If ordering by month is desired, define the ordering of months:
def monthOfYear(month: String): Int =
month match {
case "January" => 1
case "February" => 2
...
}
and sort the RDD by month:
.sortBy { case (month, _) => monthOfYear(month) }
and sort the domains by count descending:
.mapValues(_.toSeq.sortBy{ case (domain, count) => count }(Ordering[Int].reverse))
That's direct and efficient but has the problem that all domain-count pairs for a month must fit in memory.
Instead you could start over by sorting by count descending:
.sortBy(p => p._2, false)
and then group by month. I haven't tested this, and I don't think the behavior is guaranteed, but I expect that in practice the elements will be encountered in order by count even after grouping.
Upvotes: 6