Reputation: 80
I am new to spark and I have two long running stages that are doing almost the same thing. Below is my pseudo code.
var metaData = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(csvFile)
val met = broadcast(metaData.dropDuplicates(Seq("col1")))
val accessLogs = sc.textFile(logFile).filter(line => regex.pattern.matcher(line).matches).map(line => LogParser.parseLogLine(line)).toDF()
val joinOutput = accessLogs.join(met,accessLogs("col1") === met("col1"),"left_outer")
val uniqueDfNames2 = Seq("col0", "col1", "col2", "col3","col4")
val sparseFilter = joinOutput
.filter(joinOutput.col("col1").isNotNull)
.filter(joinOutput.col("col2").isNotNull)
.flatMap(row=>ListParser.parseLogLine(row))
sparseFilter.cache()
val uniqueCount = sparseFilter
.filter{r=>r.col0 != null && r.col0 != "" }
.map{
case(KeyValParse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4,col5),1)
}
.distinct().cache()
.map {case ((col0,col1,col2,col3,col4),count) => ((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map {case ((col0,col1,col2,col3,col4),count) => (col0,col1,col2,col3,col4,count)
}
.toDF(uniqueDfNames: _*).cache()
val totalCount = sparseFilter
.map{
case(Parse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map{
case ((col0,col1,col2,col3,col4),totcount) => (col0,col1,col2,col3,col4,totcount)
}
.toDF(uniqueDfNames2: _*)
.join(uniqueCount,Seq("col0", "col1", "col2", "col3"),"left")
.select($"col0",$"col1",$"col2",$"col3",$"unicount",$"totcount")
.orderBy($"unicount".desc)
.toDF(totalDfNames: _*)
totalCount
.select("*")
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", "|")
.save(countPath)
What I am trying to do here is generating unique and totalcount from the logs based on some parameters.
Everything works fine but there are these two long running stages that share almost same DAG.
Below is the shot for both stages.
Please have a look at the screenshot of both the stages given below.
Until the flatmap task, they both do same thing. Why are these not merged into one stage? Why does stage 11 re-read the file again and do all the computations again is what I am not able to guess?
For a 20Gb of data with 10 executors (7 cores, 15Gb RAM) it is taking almost 30 minutes to complete, but I feel this can be reduced to quite a low time.
Any guidance would be appreciated.
PS:- Sorry for my image editing skills :)
Upvotes: 2
Views: 652
Reputation: 1751
RDDs are cached first time it is computed in an action. The first action in your code is "distinct", that is when the "sparseFilter" RDD is cached. So the first cache operation may not be useful for the subsequent stages. The first stage's output is a distinct RDD, but later you are referring to sparseFilter. So Spark has to recompute the RDD again.
I think the logic can be changed little differently. If I understood it correctly, both for totalCount and uniqueCount, the code uses the same set of columns (col0,col1,col2,col3,col4). So in the totalCount calculation, after the reduceByKey, a simple count should give the uniqueCount? The additional distinct, reduceByKey, join etc can be avoided this way.
Upvotes: 1