Reputation: 3838
E.g. I need to get a list of all available executors and their respective multithreading capacity (NOT the total multithreading capacity, sc.defaultParallelism already handle that).
Since this parameter is implementation-dependent (YARN and spark-standalone have different strategy for allocating cores) and situational (it may fluctuate because of dynamic allocation and long-term job running). I cannot use other method to estimate this. Is there a way to retrieve this information using Spark API in a distributed transformation? (E.g. TaskContext, SparkEnv)
UPDATE As for Spark 1.6, I have tried the following methods:
1) run a 1-stage job with many partitions ( >> defaultParallelism ) and count the number of distinctive threadIDs for each executorID:
val n = sc.defaultParallelism * 16
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()
This however leads to an estimation higher than actual multithreading capacity because each Spark executor uses an overprovisioned thread pool.
2) Similar to 1, except that n = defaultParallesim, and in every task I add a delay to prevent resource negotiator from imbalanced sharding (a fast node complete it's task and asks for more before slow nodes can start running):
val n = sc.defaultParallelism
sc.parallelize(n, n).map{
v =>
Thread.sleep(5000)
SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()
it works most of the time, but is much slower than necessary and may be broken by very imbalanced cluster or task speculation.
3) I haven't try this: use java reflection to read BlockManager.numUsableCores, this is obviously not a stable solution, the internal implementation may change at any time.
Please tell me if you have found something better.
Upvotes: 7
Views: 598
Reputation: 2967
I would try to implement SparkListener
in a way similar to web UI does. This code might be helpful as an example.
Upvotes: 1
Reputation: 35229
It is pretty easy with Spark rest API. You have to get application id:
val applicationId = spark.sparkContext.applicationId
ui URL:
val baseUrl = spark.sparkContext.uiWebUrl
and query:
val url = baseUrl.map { url =>
s"${url}/api/v1/applications/${applicationId}/executors"
}
With Apache HTTP library (already in Spark dependencies, adapted from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients):
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try
val client = new DefaultHttpClient()
val response = url
.flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
.flatMap(response => Try{
val s = response.getEntity().getContent()
val json = scala.io.Source.fromInputStream(s).getLines.mkString
s.close
json
}.toOption)
and json4s:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
case class ExecutorInfo(hostPort: String, totalCores: Int)
val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
parse(json).extract[List[ExecutorInfo]]
}.toOption)
As long as you keep application id and ui URL at hand and open ui port to external connections you can do the same thing from any task.
Upvotes: 1