Laodao
Laodao

Reputation: 1709

How to get number of executors and number of cores in Java spark

I am new to Spark and we are coding in Java now. The problem is that we are trying to figure out the number of executors and number of cores. I googled and saw some articles that mentioning that the way to do that in Spark as below. But didn't see anything similar in Java (JavaSparkContext doesn't have getExecutorMemoryStatus or getExecutorStorageStatus). Can anyone help, please?

 // for executor numbers
 def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

  // for executor core numbers
  int(sc._conf.get('spark.executor.cores'))

Upvotes: 0

Views: 1941

Answers (2)

Xia Gong
Xia Gong

Reputation: 1

Tried above answer and found the executor count can be fetched this way:

import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length

Upvotes: 0

Som
Som

Reputation: 6338

My answer is mostly based on this SO Answer. Recently, getExecutorStorageStatus has been removed from SparkContext(in the newer version of spark). Hence you can't use sc. getExecutorStorageStatus. Instead we can use SparkEnv's blockManager.master.getStorageStatus.length - 1 (the minus one is for the driver again). The normal way to get to it, via env of SparkContext. But it is not accessible outside of the org.apache.spark package. Therefore, we use an encapsulation violation pattern as:

package org.apache.spark.util

import org.apache.spark.{SparkContext, SparkEnv}

/**
  * Below objects are not accessible outside of the org.apache.spark.util package.
  * Therefore, we use an encapsulation violation pattern.
  */
object SparkInternalUtils {

  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
  def getThreadUtils: ThreadUtils.type = ThreadUtils

}

Now, we can get the instance of SparkEnv using SparkInternalUtils.sparkEnv(sc)

Define RichSparkContext as below-


import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

In scala, get the number of executors & and core count

val sc = ... // SparkContext instance
    import RichSparkContext.implicits._
    val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount

In java, get the number of executors & and core count

 JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
        RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
        System.out.println(richSparkContext.coresPerExecutor());
        System.out.println(richSparkContext.coreCount());
        System.out.println(richSparkContext.executorCount());

Upvotes: 1

Related Questions