Reputation: 1845
I am getting error while running pyspark in Jupyter Notebook using python 3.7 using this below code.
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
import pyspark as ps
conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")
sc = SparkContext('local')
sqlContext = SQLContext(sc)
from pyspark.mllib.linalg import Vector, Vectors
from nltk.stem.wordnet import WordNetLemmatizer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec
I am reading the csv file based on the following code:
datanew = sqlContext.read.format("csv") \
.options(header='true', inferschema='true') \
.load("C://Users//mypath//data.csv")
parts = datanew.rdd.map(lambda l: l.split(","))
datapysp = parts.map(lambda p: Row(uiid=p[0],title=(p[3].strip()),text=(p[4].strip())))
schemaString = "uiid title text"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
sqlContext.createDataFrame(datapysp, schema).show()
This is the error message, and I am receiving and there are columns mentioned UIID, Title and text.
Py4JJavaError: An error occurred while calling o74.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
at java.net.PlainSocketImpl.accept(Unknown Source)
at java.net.ServerSocket.implAccept(Unknown Source)
at java.net.ServerSocket.accept(Unknown Source)
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
I went through the answers provided here: Pyspark socket timeout exception after application running for a while. I tried changing the code to this as per the answers provided.
import pyspark as ps
conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")
sc = ps.SparkContext('local[4]', '', conf=conf)
I get the error saying Java gateway process exited before sending its port number while running this part sc = ps.SparkContext('local[4]', '', conf=conf).
Also tried like this, but still the same error which I received regarding Accept timed out.
parts = datanew.rdd.map(lambda l: l.split(","))
datapysp = parts.map(lambda p: Row(uiid=p[0],title=(p[3].strip()),text=(p[4].strip())))
schemaString = "uiid title text"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
sqlContext.createDataFrame(datapysp,
schema).show().config("sqlContext.executor.heartbeatInterval", "10000s")
--added this but still the error is not being resolved.
I would appreciate if any one could help me with this. I am using Windows 10 64 bit.
Upvotes: 10
Views: 8613
Reputation: 664
First of all check your spark version with this snippet code:
print( "spark version=" ,SparkSession.builder.appName("test").getOrCreate().version)
and pyspark version with pip command line:
pip show pyspark
If the version of pyspark is greater then your spark version, Then you can use older version of pyspark by using this snippet:
import pkg_resources
pkg_resources.require("pyspark==[older_version]")
Or install new version of spark in your system
Upvotes: 1
Reputation: 3237
According to this website:
spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.
Looking at that, I belive you have a problem in the spark.executor.heartbeatInterval
part of your code. I would suggest you increase the spark.executor.heartbeatInterval
.
Upvotes: 1