Neha patel
Neha patel

Reputation: 143

Job cancelled because SparkContext was shut down

While running my spark program in jupyter notebook I got the error "Job cancelled because SparkContext was shut down".I am using spark without hadoop.The same program gave output earlier but now showing error.Any idea why would the error must have occured.

My code is :

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("Musical_Instruments_5.json") 
pd=df.select(df['asin'],df['overall'],df['reviewerID'])

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for 
column in list(set(pd.columns)-set(['overall'])) ]

pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(pd).transform(pd)
transformed.show()

(training,test)=transformed.randomSplit([0.8, 0.2])
als=ALS(maxIter=30,regParam=0.09,rank=25,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

This is the point where it gives error.

    Py4JJavaError                             Traceback (most recent call last)
<ipython-input-14-2e31692d867d> in <module>()
      1 #Fit ALS model to training data
----> 2 model=als.fit(training)

C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
    286 
    287     def _fit(self, dataset):
--> 288         java_model = self._fit_java(dataset)
    289         model = self._create_model(java_model)
    290         return self._copyValues(model)

C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
    283         """
    284         self._transfer_params_to_java()
--> 285         return self._java_obj.fit(dataset._jdf)
    286 
    287     def _fit(self, dataset):

C:\spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(
    Py4JJavaError: An error occurred while calling o132.fit.
    : org.apache.spark.SparkException: Job 11 cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1841)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1754)
        at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1931)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1360)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1930)
        at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:573)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
        at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1030)
        at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
        at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:568)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)

Upvotes: 6

Views: 10387

Answers (1)

Neha patel
Neha patel

Reputation: 143

This problem is solved now.I have to create a checkpoint directory as number of iterations was more than 20 for training. The code for creating checkpoint directory is:

SparkContext.setCheckpointDir("path to directory")

Upvotes: 3

Related Questions