Reputation: 614
I built a Spark cluster
.
workers:2
Cores:12
Memory: 32.0 GB Total, 20.0 GB Used
Each worker gets 1 cpu, 6 cores and 10.0 GB memory
My program gets data source from MongoDB cluster
. Spark
and MongoDB cluster
are in the same LAN
(1000Mbps).
MongoDB document
format:
{name:string, value:double, time:ISODate}
There is about 13 million documents.
I want to get the average value of a special name
from a special hour which contains 60 documents.
Here is my key function
/*
*rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
*/
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={
val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
if(timeRangeRdd4.isEmpty()){
return basicBSONRDD(name, time)
}
else{
return timeRangeRdd4.map(tuple => {
val bson = new BasicBSONObject()
bson.put("name", tuple._1)
bson.put("value", tuple._2/60)
bson.put("time", time)
bson })
}
}
Here is part of Job
information
My program works so slowly. Does it because of isEmpty
and reduceByKey
? If yes, how can I improve it ? If not, why?
=======update ===
timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
is on the line of 34
I know reduceByKey is a global operation, and may costs much time, however, what it costed is beyond my budget. How can I improvet it or it is the defect of Spark. With the same calculation and hardware, it just costs several seconds if I use multiple thread of java.
Upvotes: 3
Views: 3695
Reputation: 67135
First, isEmpty
is merely the point at which the RDD stage ends. The map
s and filter
s do not create a need for a shuffle, and the method used in the UI is always the method that triggers a stage change/shuffle...in this case isEmpty
. Why it's running slow is not as easy to discern from this perspective, especially without seeing the composition of the originating RDD
. I can tell you that isEmpty
first checks the partition
size and then does a take(1)
and verifies whether data was returned or not. So, the odds are that there is a bottle neck in the network or something else blocking along the way. It could even be a GC issue... Click into the isEmpty
and see what more you can discern from there.
Upvotes: 4