Nina A
Nina A

Reputation: 53

What is the best way to collect the Spark job run statistics and save to database

My Spark program has got several table joins(using SPARKSQL) and I would like to collect the time taken to process each of those joins and save to a statistics table. The purpose is to run it continuously over a period of time and gather the performance at very granular level.

e.g

val DF1= spark.sql("select x,y from A,B ")

Val DF2 =spark.sql("select k,v from TABLE1,TABLE2 ")

finally I join DF1 and DF2 and then initiate an action like saveAsTable .

What I am looking for is to figure out

1.How much time it really took to compute DF1

2.How much time to compute DF2 and

3.How much time to persist those final Joins to Hive / HDFS

and put all these info to a RUN-STATISTICS table / file.

Any help is appreciated and thanks in advance

Upvotes: 2

Views: 2166

Answers (1)

maverik
maverik

Reputation: 786

Spark uses Lazy Evaluation, allowing the engine to optimize RDD transformations at a very granular level.

When you execute

val DF1= spark.sql("select x,y from A,B ")

nothing happens except the transformation is added to the Directed Acyclic Graph.

Only when you perform an Action, such as DF1.count, the driver is forced to execute a physical execution plan. This is deferred as far down the chain of RDD transformations as possible.

Therefore it is not correct to ask

1.How much time it really took to compute DF1

2.How much time to compute DF2 and

at least based on the code examples you provided. Your code did not "compute" val DF1. We may not know how long processing just DF1 took, unless you somehow tricked the compiler into processing each dataframe separately.

A better way to structure the question might be "how many stages (tasks) is my job divided into overall, and how long does it take to finish those stages (tasks)"?

And this can be easily answered by looking at the log files/web GUI timeline (comes in different flavors depending on your setup)

3.How much time to persist those final Joins to Hive / HDFS

Fair question. Check out Ganglia

Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.

Another trick I like to use it defining every sequence of transformations that must end in an action inside a separate function, and then calling that function on the input RDD inside a "timer function" block.

For instance, my "timer" is defined as such

def time[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block  
  val t1 = System.nanoTime()
  println("Elapsed time: " + (t1 - t0)/1e9 + "s")
  result
}

and can be used as

val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")

scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2

However don't call unnecessary actions just to break down the DAG into more stages/wide dependencies. This might lead to shuffles or slow down your execution.

Resources:

https://spark.apache.org/docs/latest/monitoring.html

http://ganglia.sourceforge.net/

https://www.youtube.com/watch?v=49Hr5xZyTEA

Upvotes: 3

Related Questions