Bin Lin
Bin Lin

Reputation: 25

Pyspark Job Failure on Google Cloud Dataproc

I created a Dataproc cluster with 1 master and 10 nodes. All have the same CPU and memory configuration: 32 vCPU, 120 GB memory. When I submitted a job that handles big amount of data and calculation. The job failed.

From the logging, I am not really sure about what caused the failure. But I saw the memory related error message from tJob#: job-c46fc848-6: Container killed by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

So I tried a few solutions I found from other posts. For example, I tried to increase spark.executor.memoryOverhead and spark.driver.maxResultSize in the "Properties" section when submitting a job from the "Jobs" console. The job# find-duplicate-job-c46fc848-7 still failed.

I was also seeing warning messages and not really sure what it means: 18/06/04 17:13:25 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for rdd_43_155 !

I am going to try to create a higher level cluster to see if it works. But I doubt it would solve the issue since cluster with 1 master and 10 nodes with 32 vCPU, 120 GB memory are already very powerful.

Hope to get some help from advanced users and experts. Thanks in advance!

Upvotes: 0

Views: 2236

Answers (1)

Bin Lin
Bin Lin

Reputation: 25

The root cause of the failure was related to memory caused by the self cross join. It was still failing even I kept increasing the CPU power and memories. So the solution of this is combination of the following.

  1. Use the repartition() function to repartition after the join, before the next transformation. This will fix the data skew issue. Ex: df_joined = df_joined.repartition(partitions)
  2. Broadcast right table.
  3. Break it into 10 iterations. In each iteration, I only process 1/10 of the left table joined with the full data of the right table.

See sample code:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

With the above 3 methods combined, I was able to finish the job in 1.5 hour and only used 1 master and 4 worker nodes (8 CPU and 30 GB for each vm).

Upvotes: 1

Related Questions