Codious-JR
Codious-JR

Reputation: 1748

Configuring Spark Executors on GCP Dataproc

I am not able to configure YARN and Spark to utilize all the resources on my Dataproc Spark cluster on GCP.

I am running a 1 master (4 cores) and 2 workers (16 cores) cluster, and I want my Spark application to use 30 cores out of the 32 cores available on the worker instances. But when I look at the YARN UI it says that only 5 cores are being used. When I look at Spark Executors UI, it says 20 cores are being used. When I look at the CPU activity on the workers, there is hardly any activity.

I am utterly confused. Please help.

The command to create the Dataproc cluster:

gcloud dataproc clusters create default-cluster \
        --region europe-west1 --subnet default --zone europe-west1-d \
        --master-machine-type n1-standard-4 --master-boot-disk-size 500 \
        --num-workers 2 --worker-machine-type n1-standard-16 --worker-boot-disk-size 500 \
        --image-version 1.4-debian9 \
        --project product-normalyzr

The command to submit the job:

gcloud dataproc jobs submit spark --cluster=default-cluster \
    --region=europe-west1 \
    --properties=spark.executor.instances=6,spark.executor.cores=5,spark.executor.memory=14g \
    --jars=dist/yzr-core-scala_2.11-0.1.jar,dist/deps/gson-2.8.6.jar,dist/deps/picocli-4.2.0.jar \
    --class=solutions.yzr.pnormalyzr.similarity.Main

The way I am creating the Spark Context:

def buildSession(appName: String): SparkSession = {
    SparkSession
        .builder()
        .appName(appName)
        .getOrCreate()
}

In case the problem could be linked to the spark logic (maybe partitioning or something) I am also providing the major part of the spark app code. I doubt that this could be the reason because when I run this locally on my machine I see the CPU usage completely explode, and that is what I am expecting to see on the worker nodes.

 println("Load features")
val features = session.sparkContext.textFile(inputPath)
    .map((rawText: String) => {
    new Gson().fromJson(rawText, classOf[DocFeatures])
    })

features.take(5).foreach(println)

println("Compute Scores")
val scores = features.cartesian(features)
    // compute similarity
    .map((d: (DocFeatures, DocFeatures)) => {
    val docA = d._1
    val docB = d._2

    val (score, explain) = SimilarityMetric.score(docA, docB)
    SimilarityScore(
        pA = docA.slug,
        pB = docB.slug,
        score = score,
        explain = explain)
    })
    // filter items with no similarity
    .filter(s => s.score > 0)

scores.take(5).foreach(println)

println("Export")
// store to disk
val scoreStrings = scores.map(d => {
    new Gson().toJson(d)
})
scoreStrings.take(5).foreach(println)

scoreStrings.saveAsTextFile(outputPath)

session.close()
println("End")

On the Yarn UI it only says that 5 vcores are allocated, whereas I wanted to allocate 6 instances with 5 cores each, so 30 cores in total.

YARN UI

On the Spark Job UI it says that only 4 executors were added whereas I wanted 6 executor instances.

Spark UI

On the Spark Executors UI it says that the 4 executors are allocated 5 cores each, which corresponds to my setting, but when I look at the CPU usage on the workers there is absolutely no activity there.

Spark Executors UI

Htop shows no CPU activity on the worker nodes.

Htop Workers

I feel I am confusing all the different variables YARN and Spark. Any help will be truly appreciated.

Upvotes: 4

Views: 2438

Answers (2)

Shadman R
Shadman R

Reputation: 244

You have only 2 nodes with 16 vCores each, in total of 32 vCores, which you can very well see in your Yarn UI.

Now when you are submitting your job you are requesting Yarn to create 6 containers(executors) with 5 vCores each but then on a single node you can have at max of 2 executors considering 5 cores requirement (10 vCores used up to create 2 executors on a single worker node from total of 16vCores available).

You will end up getting max of 4 executors anyways. One executor can't span multiple worker nodes.

Upvotes: 0

mazaneicha
mazaneicha

Reputation: 9417

Your running job #3 has only 4 tasks (screenshot #2), thats why you see 4 executors. Spark doesn't need 6 executors to complete 4 tasks.

Each executor (screenshot #3) has 5 cores and what looks like 14GB memory ((14GB -300MB) * 0.6 ~ 7.8GB). See Spark memory management.

Each executor executes a single task, which means it uses only one core out of 5 allocated, hence the low CPU usage. (In Spark, an executor with X cores can process X tasks in parallel. It can NOT process one task on X cores.)

Upvotes: 2

Related Questions