Vijeth Kashyap
Vijeth Kashyap

Reputation: 233

AWS Glue ExecutorLostFailure (executor 15 exited caused by one of the running tasks) Reason: Remote RPC client disassociated

I have a simple glue job where I am using pyspark to read 14million rows from RDS using JDBC and then trying to save it into S3. I can see Output logs in Glue that reading and creating dataframe is quick but while calling write opeation, it fails with the error:

error occurred while calling o89.save. Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, 10.150.85.95, executor 15): ExecutorLostFailure (executor 15 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

I have tried the following solutions:

1 weird thing I observed it is, after I create the dataframe by reading from JDBC, it keeps the entire df in 1 partition until I do repartition. But the reading step completes without any error.

I used the same code to run for 6M rows and the job completes in 5 mins. But it fails for 14M rows with the ExecutorLostFailure Error. I also see this error sometimes if I dig deep in the Logs: 2023-01-22 10:36:52,972 WARN [allocator] glue.ExecutorTaskManagement (Logging.scala:logWarning(66)): executor task creation failed for executor 203, restarting within 15 secs. restart reason: Executor task resource limit has been temporarily hit..

Code:

def read_from_db():
    logger.info(f'Starts Reading Data from {DB_TABLE} table')
    start = time.perf_counter()
    filter_query = f'SELECT * FROM {DB_TABLE}'
    sql_query = '({}) as query'.format(filter_query)
    spark_df = (glueContext.read.format('jdbc')
                .option('driver', 'org.postgresql.Driver')
                .option('url', JDBC_URL)
                .option('dbtable', sql_query)
                .option('user', DB_USERS)
                .option('password', DB_PASSWORD)
                .load()
                )
    end = time.perf_counter()
    logger.info(f'Count of records in DB is {spark_df.count()}')
    logger.info(f'Elapsed time for reading records from {DB_TABLE} table = {end - start:0.4f} seconds')
    logger.info(f'Finished Reading Data from {DB_TABLE} table')
    logger.info(f"Total no. of partitions - {spark_df.rdd.getNumPartitions()}")

    # def write_to_s3(spark_df_rep):
        # S3_PATH = (
        #     f"{S3_BUCKET}/all-entities-update/{date}/{cur_time}"
        # )
    #     spark_df_rep.write.format("csv").option("header", "true").save(S3_PATH)
    spark_df = spark_df.repartition(20)
    logger.info(f"Completed Repartitioning. Total no. of partitions - {spark_df.rdd.getNumPartitions()}")
    # spark_df.foreachPartition(write_to_s3)

    # spark_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, "spark_dynamic_frame")
    # logger.info("Conversion to DynmaicFrame compelete")
    # glueContext.write_dynamic_frame.from_options(
    #     frame=spark_dynamic_frame,
    #     connection_type="s3",
    #     connection_options={"path": S3_PATH},
    #     format="csv"
    # )

    S3_PATH = (
            f"{S3_BUCKET}/all-entities-update/{date}/{cur_time}"
        )
    spark_df.write.format("csv").option("header", "true").save(S3_PATH)
    return

Upvotes: 6

Views: 8262

Answers (2)

Vijeth Kashyap
Vijeth Kashyap

Reputation: 233

I Understood this was because, no memory was left in 1 executor - Increasing workers doesn't help. Because 1 Worker → 1 Executor → 2 DPUs. Even max configuration with G2.X doesn’t help. This issue stir up because the data was skewed. All rows in my Database were similar, except 2 columns out of 13 columns. And Pyspark wasn't able to load it into different partitions and it was trying to load all my rows into a single partition.

So increasing Workers/ Executors was of no help.

I solved this by loading data into different partitions manually. Spark actually tried to keep everything in 1 partition, I verified that it was in 1 partition. Even adding repartitioning doesn’t help,

I was getting error while writing and not when reading. This was the cause of confusion. But the actual issue was with reading and the read was actually trigered when write(transformation) is called. So we were getting error at write step:

From other SO answers

Spark reads the data as soon as an action is applied, since you are just reading and writing to s3 so data is read when the write is triggered.

Spark is not optimized to read bulk data from rdbms as it establishes only single connection to the database

Write data to parquet format in parallel

Also see: Databricks Spark Pyspark RDD Repartition - "Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues."

Manually partition for skewed data

I added a temporary column called RNO (Row number) which is used as partitionColumn to partition data into partitions and it has to be either int/ datetime. After we are done with the job I drop this RNO column in the job itself or manually.

I had to read 14 million records from RDBMS and then write it to S3 where in each file should have around 200k records. This is where we can use upperBound, lowerBound and numPartitions along with your partitionKey. Ran with upperBound-14,000,000 and lowerBound-1 and numPartitions-70 to check if all files get 200k records (upperBound/numPartitions - lowerBound/numPartitions) . And it created 65 files and job ran successfully within 10mins.

filter_query = f'select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from {DB_TABLE}'
sql_query = '({}) as query'.format(filter_query)  
spark_df = (spark.read.format('jdbc')
            .option('driver', 'org.postgresql.Driver')
            .option('url', JDBC_URL)
            .option('dbtable', sql_query)
            .option('user', DB_USERS)
            .option('password', DB_PASSWORD)
            .option('partitionColumn','RNO')
            .option('numPartitions',70)
            .option('lowerBound',1)
            .option('upperBound',14000000)
            .load()
            )

Additional references: https://blog.knoldus.com/understanding-the-working-of-spark-driver-and-executor/

Upvotes: 2

Oddys
Oddys

Reputation: 116

In many cases this quite a criptic error message signals about OOM. Setting spark.task.cpus to value greater than default 1 (up to 8 which is the number of cores on G2.X worker for Glue verson 3 or higher) helped me. This effectively increases the amount of memory a single Spark task will get (at a cost of a few cores being idle).

Upvotes: 1

Related Questions