Daniel Imberman
Daniel Imberman

Reputation: 628

Reducing potentially empty RDD's

So I'm running into an issue where a filter I'm using on an RDD can potentially create an empty RDD. I feel that doing a count() in order to test for emptiness would be very expensive, and was wondering if there is a more performant way to handle this situation.

Here is an example of what this issue might look like:

    val b:RDD[String] = sc.parallelize(Seq("a","ab","abc"))


    println(b.filter(a => !a.contains("a")).reduce(_+_))

would give the result

empty collection
java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)

Does anyone have any suggestions for how I should go about addressing this edge case?

Upvotes: 8

Views: 3892

Answers (2)

Roberto Congiu
Roberto Congiu

Reputation: 5213

how about

scala> val b = sc.parallelize(Seq("a","ab","abc"))
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at     parallelize at <console>:24

scala> b.isEmpty
res1: Boolean = false

Upvotes: 1

Dima
Dima

Reputation: 40500

Consider .fold("")(_ + _) instead of .reduce(_ + _)

Upvotes: 13

Related Questions