pferrel
pferrel

Reputation: 5702

Spark splitting a DStream into several RDDs

The same question also applies to splitting an RDD into several new RDDs.

A DStream or RDD contains several different case classes and I need to turn them into separate RDDs based on case class type.

I'm aware of

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }

or

val newRDD = rdd.filter { 
  a => a match { 
    case _: CC1 => true
    case _ => false
  }
}

But this requires many runs through the original RDD, one per case class type.

  1. There must be a more concise way to do the above matching filter?
  2. Is there a way to split an rdd into several by the element type with one parallel pass?

Upvotes: 3

Views: 2909

Answers (2)

maasg
maasg

Reputation: 37435

1) A more concise way of filtering for a given type is to use rdd.collect(PartialFunction[T,U])

The equivalent of

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }

would be:

val newRDD = rdd.collect{case c:CaseClass1 => c}

It could even be combined with additional filtering and transformation:

val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget}

rdd.collect(p:PartialFunction[T,U]) should not be confused with rdd.collect() which delivers data back to the driver.


2) To split an RDD (or a DStream for that matter), filter is the way to go. One must remember that an RDD is a distributed collection. Filter will let you apply a function to a subset of that distributed collection, in parallel, over the cluster.

A structural creation of 2 or more RDDs from an original RDD would incur a 1-to-many shuffle stage, which will be substantially more expensive.

Upvotes: 4

pferrel
pferrel

Reputation: 5702

Looks like with rdd.filter I was on the right track with the long form. A slightly more concise version is:

val newRDD = rdd.filter { case _: CC1 => true ; case _ => false }

You can't leave out the case _ => false or the test for class is not exhaustive and you'll get errors. I couldn't get the collect to work correctly.

@maasg gets credit for the right answer about doing separate filter passes rather than hacking a way to split input in one pass.

Upvotes: 1

Related Questions