tribbloid
tribbloid

Reputation: 3838

Method to get number of cores for a executor on a task node?

E.g. I need to get a list of all available executors and their respective multithreading capacity (NOT the total multithreading capacity, sc.defaultParallelism already handle that).

Since this parameter is implementation-dependent (YARN and spark-standalone have different strategy for allocating cores) and situational (it may fluctuate because of dynamic allocation and long-term job running). I cannot use other method to estimate this. Is there a way to retrieve this information using Spark API in a distributed transformation? (E.g. TaskContext, SparkEnv)

UPDATE As for Spark 1.6, I have tried the following methods:

1) run a 1-stage job with many partitions ( >> defaultParallelism ) and count the number of distinctive threadIDs for each executorID:

val n = sc.defaultParallelism * 16
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()

This however leads to an estimation higher than actual multithreading capacity because each Spark executor uses an overprovisioned thread pool.

2) Similar to 1, except that n = defaultParallesim, and in every task I add a delay to prevent resource negotiator from imbalanced sharding (a fast node complete it's task and asks for more before slow nodes can start running):

val n = sc.defaultParallelism
sc.parallelize(n, n).map{
  v =>
    Thread.sleep(5000)
    SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()

it works most of the time, but is much slower than necessary and may be broken by very imbalanced cluster or task speculation.

3) I haven't try this: use java reflection to read BlockManager.numUsableCores, this is obviously not a stable solution, the internal implementation may change at any time.

Please tell me if you have found something better.

Upvotes: 7

Views: 598

Answers (2)

Vitalii Kotliarenko
Vitalii Kotliarenko

Reputation: 2967

I would try to implement SparkListener in a way similar to web UI does. This code might be helpful as an example.

Upvotes: 1

Alper t. Turker
Alper t. Turker

Reputation: 35229

It is pretty easy with Spark rest API. You have to get application id:

val applicationId = spark.sparkContext.applicationId

ui URL:

val baseUrl = spark.sparkContext.uiWebUrl

and query:

val url = baseUrl.map { url => 
  s"${url}/api/v1/applications/${applicationId}/executors"
}

With Apache HTTP library (already in Spark dependencies, adapted from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients):

import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try

val client = new DefaultHttpClient()

val response = url
  .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
  .flatMap(response => Try{
    val s = response.getEntity().getContent()
    val json = scala.io.Source.fromInputStream(s).getLines.mkString
    s.close
    json
  }.toOption)

and json4s:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

case class ExecutorInfo(hostPort: String, totalCores: Int)

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
  parse(json).extract[List[ExecutorInfo]]
}.toOption)

As long as you keep application id and ui URL at hand and open ui port to external connections you can do the same thing from any task.

Upvotes: 1

Related Questions