Reputation: 5702
The same question also applies to splitting an RDD into several new RDDs.
A DStream or RDD contains several different case classes and I need to turn them into separate RDDs based on case class type.
I'm aware of
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
or
val newRDD = rdd.filter {
a => a match {
case _: CC1 => true
case _ => false
}
}
But this requires many runs through the original RDD, one per case class type.
Upvotes: 3
Views: 2909
Reputation: 37435
1) A more concise way of filtering for a given type is to use rdd.collect(PartialFunction[T,U])
The equivalent of
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
would be:
val newRDD = rdd.collect{case c:CaseClass1 => c}
It could even be combined with additional filtering and transformation:
val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget}
rdd.collect(p:PartialFunction[T,U])
should not be confused with rdd.collect()
which delivers data back to the driver.
2) To split an RDD (or a DStream for that matter), filter
is the way to go. One must remember that an RDD is a distributed collection. Filter will let you apply a function to a subset of that distributed collection, in parallel, over the cluster.
A structural creation of 2 or more RDDs from an original RDD would incur a 1-to-many shuffle stage, which will be substantially more expensive.
Upvotes: 4
Reputation: 5702
Looks like with rdd.filter
I was on the right track with the long form. A slightly more concise version is:
val newRDD = rdd.filter { case _: CC1 => true ; case _ => false }
You can't leave out the case _ => false
or the test for class is not exhaustive and you'll get errors. I couldn't get the collect to work correctly.
@maasg gets credit for the right answer about doing separate filter passes rather than hacking a way to split input in one pass.
Upvotes: 1