Reputation: 63259
In an attempt to reduce the time spent on gathering count
s of DataFrame
rows the RDD.countApproximate()
is being invoked. It has the following signature:
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
I have attempted to limit the output calculation to sixty seconds. Notice also the very low accuracy requirement of 0.10
val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
However the actual time is .. 17 minutes ??
That time is almost the same as what was required to generate the data in the first place (19 minutes) !
So then - what is the use of this api: is there any way to get it to actually save some meaningful fraction of the exact time calculation?
TL;DR (See accepted answer): use initialValue
instead of getFinalValue
Upvotes: 8
Views: 1884
Reputation: 1189
Please note the return type in the approxCount
definition. It's a partial result.
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
Now, please pay attention on how it is being used:
val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).**getFinalValue**.mean
According to spark scala doc, getFinalValue
is blocking method which means it will wait for complete operation to finish.
Whereas initialValue
can be fetched within specified timeout. So the following snippet will not block further operations after timeout,
val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
Please note the downside of using countApprox(timeout, confidence).initialValue
is that even after getting the value, it will continue counting till it get final count that you would have obtained using getFinalValue
and still will hold the resources till operation is complete.
Now the use of this api is not to get blocked at count operation.
Reference:[email protected]%3E
Now lets validate our assumption of non blocking operation on spark2-shell. Lets create random dataframe and perform count
, approxCount
with getFinalValue
and approxCount
with initialValue
scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n", StringType)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true), StructField(column_1,StringType,true), StructField(column_2,StringType,true), StructField(column_3,StringType,true), StructField(column_4,StringType,true), StructField(column_5,StringType,true), StructField(column_6,StringType,true), StructField(column_7,StringType,true), StructField(column_8,StringType,true), StructField(column_9,StringType,true), StructField(column_10,StringType,true))
scala> val rows = spark.sparkContext.parallelize(Seq[Row](), 100).mapPartitions { _ => { Range(0, 100000).map(m => Row( => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } }
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32
scala> val inputDf = spark.sqlContext.createDataFrame(rows, schema)
inputDf: org.apache.spark.sql.DataFrame = [column_0: string, column_1: string ... 9 more fields]
//Please note that cnt will be displayed only when all tasks are completed
scala> val cnt = inputDf.rdd.count
cnt: Long = 10000000
scala> val waitSecs = 60
waitSecs: Int = 60
//cntApproxFinal will be displayed only when all tasks are completed.
scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
[Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7
scala> val waitSecs = 60
waitSecs: Int = 60
//Please note that cntApprxInitila in this case, will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value, it continued will all the remaining tasks
scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
[Stage 2:============================================> (80 + 4) / 100]cntApprxInitial: Double = 1.0E7
[Stage 2:=======================================================>(99 + 1) / 100]
Let's have look at spark ui and spark-shell, all 3 operations took same time:
is available before completion of all tasks.
Hope, this helps!
Upvotes: 12