Reputation: 1416
I am trying to find a reliable way to compute the size (in bytes) of a Spark dataframe programmatically.
The reason is that I would like to have a method to compute an "optimal" number of partitions ("optimal" could mean different things here: it could mean having an optimal partition size, or resulting in an optimal file size when writing to Parquet tables - but both can be assumed to be some linear function of the dataframe size). In other words, I would like to call coalesce(n)
or repartition(n)
on the dataframe, where n
is not a fixed number but rather a function of the dataframe size.
Other topics on SO suggest using SizeEstimator.estimate
from org.apache.spark.util
to get the size in bytes of the dataframe, but the results I'm getting are inconsistent.
First of all, I'm persisting my dataframe to memory:
df.cache().count
The Spark UI shows a size of 4.8GB in the Storage tab. Then, I run the following command to get the size from SizeEstimator
:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
This gives a result of 115'715'808 bytes =~ 116MB. However, applying SizeEstimator
to different objects leads to very different results. For instance, I try computing the size separately for each row in the dataframe and sum them:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
This results in a size of 12'084'698'256 bytes =~ 12GB. Or, I can try to apply SizeEstimator
to every partition:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
which results again in a different size of 10'792'965'376 bytes =~ 10.8GB.
I understand there are memory optimizations / memory overhead involved, but after performing these tests I don't see how SizeEstimator
can be used to get a sufficiently good estimate of the dataframe size (and consequently of the partition size, or resulting Parquet file sizes).
What is the appropriate way (if any) to apply SizeEstimator
in order to get a good estimate of a dataframe size or of its partitions? If there isn't any, what is the suggested approach here?
Upvotes: 54
Views: 70539
Reputation: 11
Based on hyriu's answer and 100chou's answer, I have created a PySpark helper library RepartiPy.
Now you can easily get the optimal number of partitions
and do a repartition based on that value as follows:
import repartipy
one_gib_in_bytes = 1073741824
# Use this if you have enough (executor) memory to cache the whole DataFrame
# If you have NOT enough memory (i.e. too large DataFrame), use 'repartipy.SamplingSizeEstimator' instead.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
# do your repartition & write
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
Please see the docs for more details.
Upvotes: 0
Reputation: 106
Answering here because I don't have enough rep to comment on the accepted answer.
As of this commit on Spark core, the method executionPlan asks for two parameters, the logicalPlan and mode.
Adding to hyriu`s answer, for PySpark:
df.cache().foreach(lambda x: x)
spark._jsparkSession.sessionState() \
.executePlan(
df._jdf.queryExecution().logical(),
df._jdf.queryExecution().mode()
).optimizedPlan() \
.stats() \
.sizeInBytes()
Upvotes: 7
Reputation: 1416
Unfortunately, I was not able to get reliable estimates from SizeEstimator
, but I could find another strategy - if the dataframe is cached, we can extract its size from queryExecution
as follows:
df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
catalyst_plan).optimizedPlan.stats.sizeInBytes
For the example dataframe, this gives exactly 4.8GB (which also corresponds to the file size when writing to an uncompressed Parquet table).
This has the disadvantage that the dataframe needs to be cached, but it is not a problem in my case.
EDIT: Replaced df.cache.foreach(_=>_)
by df.cache.foreach(_ => ())
, thanks to @DavidBenedeki for pointing it out in the comments.
Upvotes: 44
Reputation: 159
My suggestion is
from sys import getsizeof
def compare_size_two_object(one, two):
'''compare size of two files in bites'''
print(getsizeof(one), 'versus', getsizeof(two))
Upvotes: -3
Reputation: 29185
Apart from Size estimator, which you have already tried(good insight)..
below is another option
RDDInfo[] getRDDStorageInfo()
Return information about what RDDs are cached, if they are in mem or on both, how much space they take, etc.
actually spark storage tab uses this.Spark docs
Below is the implementation from spark
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
getRDDStorageInfo(_ => true)
}
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
assertNotStopped()
val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id
val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
}
rddInfos.filter(_.isCached)
}
yourRDD.toDebugString
from RDD also uses this. code here
In my opinion, to get optimal number of records in each partition and check your repartition is correct and they are uniformly distributed, I would suggest to try like below... and adjust your re-partition number. and then measure the size of partition... would be more sensible. to address this kind of problems
yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
.toDF("PartitionNumber","NumberOfRecordsPerPartition")
.show
or existing spark functions (based on spark version)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
Upvotes: 8
Reputation: 2232
SizeEstimator
returns the number of bytes an object takes up on the JVM heap. This includes objects referenced by the object, the actual object size will almost always be much smaller.
The discrepancies in sizes you've observed are because when you create new objects on the JVM the references take up memory too, and this is being counted.
Check out the docs here 🤩
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$
Upvotes: 10