Leo
Leo

Reputation: 1770

How to filter a RDD according to a function based another RDD in Spark?

I am a beginner of Apache Spark. I want to filter out all groups whose sum of weight is larger than a constant value in a RDD. The "weight" map is also a RDD. Here is a small-size demo, the groups to be filtered is stored in "groups", the constant value is 12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

When the input data is very large, > 10GB for example, I always encounter a "java heap out of memory" error. I doubted if it's caused by "weights.toArray.toMap", because it convert an distributed RDD to an Java object in JVM. So I tried to filter with RDD directly:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

When I ran result.collect after loading this script into spark shell, I got a "java.lang.NullPointerException" error. Someone told me when a RDD is manipulated in another RDD, there will be a nullpointer exception, and suggest me to put the weight into Redis.

So how can I get the "result" without convert "weight" to Map, or put it into Redis? If there is a solution to filter a RDD based on another map-like RDD without the help of external datastore service? Thanks!

Upvotes: 3

Views: 24687

Answers (2)

zhang zhan
zhang zhan

Reputation: 1596

Suppose your group is unique. Otherwise, first make it unique by distinct, etc. If group or weights is small, it should be easy. If both group and weights are huge, you can try this, which may be more scalable, but also looks complicated.

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)

Upvotes: 4

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

The "java out of memory" error is coming because spark uses its spark.default.parallelism property while determining number of splits, which by default is number of cores available.

// From CoarseGrainedSchedulerBackend.scala

override def defaultParallelism(): Int = {
   conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

When the input becomes large, and you have limited memory, you should increase number of splits.

You can do something as follows:

 val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
 val splitSize = 10000 // specify some number of elements that fit in memory.

 val numSplits = (input.size / splitSize) + 1 // has to be > 0.
 val groups = sc.parallelize(input, numSplits) // specify the # of splits.

 val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

 def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
 val result = groups.filter(isHeavy)

You may also consider increasing executor memory size using spark.executor.memory.

Upvotes: 2

Related Questions