yeyimilk
yeyimilk

Reputation: 614

Spark RDD.isEmpty costs much time

I built a Spark cluster.
workers:2
Cores:12
Memory: 32.0 GB Total, 20.0 GB Used
Each worker gets 1 cpu, 6 cores and 10.0 GB memory

My program gets data source from MongoDB cluster. Spark and MongoDB cluster are in the same LAN(1000Mbps). MongoDB document format: {name:string, value:double, time:ISODate}

There is about 13 million documents.

I want to get the average value of a special name from a special hour which contains 60 documents. Here is my key function

 /*
  *rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
   Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
  */
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={

val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

if(timeRangeRdd4.isEmpty()){
  return basicBSONRDD(name, time)
}
else{
 return timeRangeRdd4.map(tuple => {
  val bson = new BasicBSONObject()
  bson.put("name", tuple._1)
  bson.put("value", tuple._2/60)
  bson.put("time", time)
   bson })
  }
} 

Here is part of Job information enter image description here enter image description here

My program works so slowly. Does it because of isEmpty and reduceByKey? If yes, how can I improve it ? If not, why?
=======update ===

 timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

is on the line of 34

enter image description here

I know reduceByKey is a global operation, and may costs much time, however, what it costed is beyond my budget. How can I improvet it or it is the defect of Spark. With the same calculation and hardware, it just costs several seconds if I use multiple thread of java.

Upvotes: 3

Views: 3695

Answers (1)

Justin Pihony
Justin Pihony

Reputation: 67135

First, isEmpty is merely the point at which the RDD stage ends. The maps and filters do not create a need for a shuffle, and the method used in the UI is always the method that triggers a stage change/shuffle...in this case isEmpty. Why it's running slow is not as easy to discern from this perspective, especially without seeing the composition of the originating RDD. I can tell you that isEmpty first checks the partition size and then does a take(1) and verifies whether data was returned or not. So, the odds are that there is a bottle neck in the network or something else blocking along the way. It could even be a GC issue... Click into the isEmpty and see what more you can discern from there.

Upvotes: 4

Related Questions