Reputation: 1709
I am new to Spark and we are coding in Java now. The problem is that we are trying to figure out the number of executors and number of cores. I googled and saw some articles that mentioning that the way to do that in Spark as below. But didn't see anything similar in Java (JavaSparkContext doesn't have getExecutorMemoryStatus
or getExecutorStorageStatus
). Can anyone help, please?
// for executor numbers
def currentActiveExecutors(sc: SparkContext): Seq[String] = {
val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
val driverHost: String = sc.getConf.get("spark.driver.host")
allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
}
// for executor core numbers
int(sc._conf.get('spark.executor.cores'))
Upvotes: 0
Views: 1941
Reputation: 1
Tried above answer and found the executor count can be fetched this way:
import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length
Upvotes: 0
Reputation: 6338
My answer is mostly based on this SO Answer.
Recently, getExecutorStorageStatus
has been removed from SparkContext
(in the newer version of spark). Hence you can't use sc. getExecutorStorageStatus
. Instead we can use SparkEnv's blockManager.master.getStorageStatus.length - 1
(the minus one is for the driver again). The normal way to get to it, via env
of SparkContext
. But it is not accessible outside of the org.apache.spark
package. Therefore, we use an encapsulation violation pattern
as:
package org.apache.spark.util
import org.apache.spark.{SparkContext, SparkEnv}
/**
* Below objects are not accessible outside of the org.apache.spark.util package.
* Therefore, we use an encapsulation violation pattern.
*/
object SparkInternalUtils {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
def getThreadUtils: ThreadUtils.type = ThreadUtils
}
Now, we can get the instance of SparkEnv
using SparkInternalUtils.sparkEnv(sc)
Define RichSparkContext as below-
import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
In scala, get the number of executors & and core count
val sc = ... // SparkContext instance
import RichSparkContext.implicits._
val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount
In java, get the number of executors & and core count
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
System.out.println(richSparkContext.coresPerExecutor());
System.out.println(richSparkContext.coreCount());
System.out.println(richSparkContext.executorCount());
Upvotes: 1