Reputation: 923
I'm trying to run ALS model on my pyspark dataframe and I'm always running into same error:
Here's my spark config:
spark_config["spark.executor.memory"] = "32G"
spark_config["spark.executor.memoryOverhead"] = "20G"
spark_config["spark.executor.cores"] = "32"
spark_config["spark.driver.memory"] = "32G"
# spark_config["spark.shuffle.memoryFraction"] = "0"
# Executor config
spark_config["spark.dyamicAllocation.enable"] = "true"
spark_config["spark.dynamicAllocation.minExecutors"] = "100"
spark_config["spark.dynamicAllocation.maxExecutors"] = "300"
Here's my model training.
df = spark.read.parquet('file.parquet')
df = df.filter(df.item_id.isNotNull())
X_train, X_test = df.randomSplit([0.8, 0.2])
als = ALS(userCol= "cid_int", itemCol= "item_id", ratingCol= "score", rank=10, maxIter=10, seed=0)
model = als.fit(X_train)
Here's my error:
Py4JJavaError: An error occurred while calling o194.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 160, p13explorpdp01-sw-1zwh.c.wmt-bfdms-p13expprod.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Slave lost
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1914)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1913)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1913)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2147)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2096)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2085)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2076)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2097)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2141)
at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:932)
at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:676)
at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:185)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
I have tried to adjust my spark config with different different parameter, but nothing is helping.
Upvotes: 1
Views: 2895
Reputation: 2043
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 160, p13explorpdp01-sw-1zwh.c.wmt-bfdms-p13expprod.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Slave lost
Your spark application fail at the very early stage. Are you running your spark on YARN? The error shows that one of your tasks fail and cause your executor exit.
It seems that you're already failed before training your model, which should be your data transformation part. If you're only fail on this task but not other tasks under the same stage, please check if your data in different partition is skewness or not. If you fail most of the task under the same stage, you should increase your memory in this spark application.
Upvotes: 0