Alejandro A
Alejandro A

Reputation: 1190

Pyspark, executors lost connectivity when performing a join

My mesos-spark cluster:

enter image description here

Executors are crashing every time I try to do a .count() after a join, the count without the join works perfectly, not sure why but in failed queries I see:

enter image description here

And in the executor logs:

enter image description here

I don't see an specific OOM issue, what's the deal here? It seems to happen only when the join is made.

Followed @busfighter suggestions and set the dataframes to StorageLevel.MEMORY_ONLY before joining and reduced partitions using coalesce(). Still the same error.

Edit 1

Tried all comments, nothing:

  1. Saving to memory the data
  2. Repartitioned to 12 partition (was 200), to be added that after checking the spark/jobs web UI the executors are never specifically removed by Spark(Mesos) on my cluster
  3. Changed value spark.sql.autoBroadcastJoinThreshold to 20 smaller the default value

Edit 2

At no given point, when the task fails the executors are removed, they just timeout on shuffle:

enter image description here

Edit 3

See that the data size is really small when it crashes, feeling lost and can't find the executor logs to see if it was killed becaues of OOM:

enter image description here

Edit 4

Some important notes:

Config used on PySpark

conf = (SparkConf()
        .setAppName('daily_etl')
        .setMaster(XXXXX)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .set('spark.mesos.executor.home','/opt/spark')
        )

spark = SparkSession.builder\
    .config(conf=conf) \
    .getOrCreate()

Edit 5

Screenshot of the error:

enter image description here

Edit 6

Adding screenshot of the Mesos UI

enter image description here

Edit 7

Managed to narrow down the problem, for some reason BlockManager is listening to localhost, hence the other executors cannot conect:

enter image description here

Not sure why but will crate another topic.

Upvotes: 1

Views: 1117

Answers (1)

John Doe
John Doe

Reputation: 10213

Please try this:

conf = (SparkConf()
        .setAppName('daily_etl')
        .setMaster(XXXXX)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .set("spark.mesos.executor.home","/opt/spark")
        .set("spark.driver.memory", "16G")
        .set("spark.executor.memory","8G")
        .set("spark.sql.autoBroadcastJoinThreshold", "-1")
        )

Maybe also do a repartition

df = df.repartition(2000)

Value depends on cluster

Upvotes: 1

Related Questions