sag
sag

Reputation: 5461

How can I find the size of a RDD

I have RDD[Row], which needs to be persisted to a third party repository. But this third party repository accepts of maximum of 5 MB in a single call.

So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.

How can I find the size of a RDD and create partitions based on it?

Upvotes: 28

Views: 54263

Answers (5)

warrens
warrens

Reputation: 2145

This is the version to use if you are actually working with big data on a cluster -- i.e. it eliminates the collect.

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")

  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")

  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")

  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")

  return estimatedTotalSize
}

// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)

Upvotes: 0

Yiying Wang
Yiying Wang

Reputation: 171

I think RDD.count() will give you the number of elements in the RDD

Upvotes: 6

Haiying Wang
Haiying Wang

Reputation: 652

One straight forward way is to call following, depending on whether you want to store your data in serialized form or not, then go to spark UI "Storage" page, you should be able to figure out the total size of the RDD (memory + disk):

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

It is not easy to calculate accurate memory size at runtime. You may try do an estimation at runtime though: based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB; this is similar to Justin suggested earlier.

Hope this could help.

Upvotes: 7

sag
sag

Reputation: 5461

As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.

We can sample a RDD and then use SizeEstimator to get the size of sample. As Wang and Justin mentioned, based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB

Here is the sample scala code to get the size/estimate of a RDD.

I am new to scala and spark. Below sample may be written in a better way

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}

Upvotes: 15

Justin Pihony
Justin Pihony

Reputation: 67135

This is going to depend on factors such as serialization, so it is not cut and dry. However, you could take a sample set and run some experimentation on that sample data, extrapolating from there.

Upvotes: 3

Related Questions