Reputation: 60062
Suppose I have an RDD (50M records/dayredu) which I want to summarize in several different ways.
The RDD records are 4-tuples: (keep, foo, bar, baz)
.
keep
- boolean foo
, bar
, baz
- 0/1 intI want to count how many of each of the foo
&c are kept and dropped, i.e., I have to do the following for foo
(and the same for bar
and baz
):
rdd.filter(lambda keep, foo, bar, baz: foo == 1)
.map(lambda keep, foo, bar, baz: keep, 1)
.reduceByKey(operator.add)
which would return (after collect
) a list like [(True,40000000),(False,10000000)]
.
The question is: is there an easy way to avoid scanning rdd
3 times (once for each of foo
, bar
, baz
)?
What I mean is not a way to rewrite the above code to handle all 3 fields, but telling spark to process all 3 pipelines in a single pass.
Upvotes: 2
Views: 651
Reputation: 37435
It's possible to execute the three pipelines in parallel by submitting the job with different threads, but this will pass through the RDD three times and require up to 3x more resources on the cluster.
It's possible to get the job done in one pass by rewriting the job to handle all counts at once - the answer regarding aggregate
is an option. Splitting the data in pairs (keep, foo) (keep, bar), (keep, baz)
would be another.
It's not possible to get the job done in one pass without any code changes, as there would not be a way for Spark to know that those jobs relate to the same dataset. At most, the speed of subsequent jobs after the first one could be improved by caching
the initial rdd with rdd.cache
before the .filter().map().reduce()
steps; this will still pass through the RDD 3 times, but the 2nd and 3rd time will be potentially a lot faster if all data fits in the memory of the cluster:
rdd.cache
// first reduceByKey action will trigger the cache and rdd data will be kept in memory
val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
// subsequent operations will execute faster as the rdd is now available in mem
val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)
If I were doing this, I would create pairs of the relevant data and count them in a single pass:
// We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)
val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) =>
def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
}
val totals = dataPairs.reduceByKey(_ + _)
This is easy and will pass over the data only once, but requires rewriting of the code. I'd say it scores 66,66% in answering the question.
Upvotes: 3
Reputation: 1581
If I'm reading your question correctly, you want RDD.aggregate.
val zeroValue = (0L, 0L, 0L, 0L, 0L, 0L) // tfoo, tbar, tbaz, ffoo, fbar, fbaz
rdd.aggregate(zeroValue)(
(prior, current) => if (current._1) {
(prior._1 + current._2, prior._2 + current._3, prior._3 + current._4,
prior._4, prior._5, prior._6)
} else {
(prior._1, prior._2, prior._3,
prior._4 + current._2, prior._5 + current._3, prior._6 + current._4)
},
(left, right) =>
(left._1 + right._1,
left._2 + right._2,
left._3 + right._3,
left._4 + right._4,
left._5 + right._5,
left._6 + right._6)
)
Aggregate is conceptually like the conceptual reduce function on a list, but RDDs aren't lists, they're distributed, so you provide two function arguments, one to operate on each partition, and one to combine the results of processing the partitions.
Upvotes: 1