Reputation: 53
My Spark program has got several table joins(using SPARKSQL) and I would like to collect the time taken to process each of those joins and save to a statistics table. The purpose is to run it continuously over a period of time and gather the performance at very granular level.
e.g
val DF1= spark.sql("select x,y from A,B ")
Val DF2 =spark.sql("select k,v from TABLE1,TABLE2 ")
finally I join DF1 and DF2 and then initiate an action like saveAsTable .
What I am looking for is to figure out
1.How much time it really took to compute DF1
2.How much time to compute DF2 and
3.How much time to persist those final Joins to Hive / HDFS
and put all these info to a RUN-STATISTICS table / file.
Any help is appreciated and thanks in advance
Upvotes: 2
Views: 2166
Reputation: 786
Spark uses Lazy Evaluation, allowing the engine to optimize RDD transformations at a very granular level.
When you execute
val DF1= spark.sql("select x,y from A,B ")
nothing happens except the transformation is added to the Directed Acyclic Graph.
Only when you perform an Action, such as DF1.count, the driver is forced to execute a physical execution plan. This is deferred as far down the chain of RDD transformations as possible.
Therefore it is not correct to ask
1.How much time it really took to compute DF1
2.How much time to compute DF2 and
at least based on the code examples you provided. Your code did not "compute" val DF1. We may not know how long processing just DF1 took, unless you somehow tricked the compiler into processing each dataframe separately.
A better way to structure the question might be "how many stages (tasks) is my job divided into overall, and how long does it take to finish those stages (tasks)"?
And this can be easily answered by looking at the log files/web GUI timeline (comes in different flavors depending on your setup)
3.How much time to persist those final Joins to Hive / HDFS
Fair question. Check out Ganglia
Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
Another trick I like to use it defining every sequence of transformations that must end in an action inside a separate function, and then calling that function on the input RDD inside a "timer function" block.
For instance, my "timer" is defined as such
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1e9 + "s")
result
}
and can be used as
val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")
scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2
However don't call unnecessary actions just to break down the DAG into more stages/wide dependencies. This might lead to shuffles or slow down your execution.
Resources:
https://spark.apache.org/docs/latest/monitoring.html
http://ganglia.sourceforge.net/
https://www.youtube.com/watch?v=49Hr5xZyTEA
Upvotes: 3