Reputation: 143
While running my spark program in jupyter notebook I got the error "Job cancelled because SparkContext was shut down".I am using spark without hadoop.The same program gave output earlier but now showing error.Any idea why would the error must have occured.
My code is :
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json("Musical_Instruments_5.json")
pd=df.select(df['asin'],df['overall'],df['reviewerID'])
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for
column in list(set(pd.columns)-set(['overall'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(pd).transform(pd)
transformed.show()
(training,test)=transformed.randomSplit([0.8, 0.2])
als=ALS(maxIter=30,regParam=0.09,rank=25,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)
This is the point where it gives error.
Py4JJavaError Traceback (most recent call last)
<ipython-input-14-2e31692d867d> in <module>()
1 #Fit ALS model to training data
----> 2 model=als.fit(training)
C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
286
287 def _fit(self, dataset):
--> 288 java_model = self._fit_java(dataset)
289 model = self._create_model(java_model)
290 return self._copyValues(model)
C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
283 """
284 self._transfer_params_to_java()
--> 285 return self._java_obj.fit(dataset._jdf)
286
287 def _fit(self, dataset):
C:\spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o132.fit.
: org.apache.spark.SparkException: Job 11 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1841)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1754)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1931)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1360)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1930)
at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:573)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1030)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:568)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Upvotes: 6
Views: 10387
Reputation: 143
This problem is solved now.I have to create a checkpoint directory as number of iterations was more than 20 for training. The code for creating checkpoint directory is:
SparkContext.setCheckpointDir("path to directory")
Upvotes: 3