Escachator
Escachator

Reputation: 1882

Reading large file in Spark issue - python

I have spark installed in local, with python, and when running the following code:

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv')
data.first()

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-fca93c6aedeb> in <module>()
----> 1 data.first()

C:\Spark\python\pyspark\rdd.pyc in first(self)
   1313         ValueError: RDD is empty
   1314         """
-> 1315         rs = self.take(1)
   1316         if rs:
   1317             return rs[0]

C:\Spark\python\pyspark\rdd.pyc in take(self, num)
   1295 
   1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)
   1298 
   1299             items += res

C:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    937         # SparkContext#runJob.
    938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    941 

C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
   1024         answer = self.gateway_client.send_command(command)
   1025         return_value = get_return_value(
-> 1026             answer, self.gateway_client, self.target_id, self.name)
   1027 
   1028         for temp_arg in temp_args:

C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    314                 raise Py4JJavaError(
    315                     "An error occurred while calling {0}{1}{2}.\n".
--> 316                     format(target_id, ".", name), value)
    317             else:
    318                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

I am sure the path is correct, as I have tried with other files in the same folder. I think the issue is with the size of the file, which is around 3.4 gigabytes.

Any help, please?

Upvotes: 7

Views: 4282

Answers (2)

Maviles
Maviles

Reputation: 3469

As seen in the spark documentation the default maximum values for the memory usage are set to 1GB.

You can see the defaults in spark configuration file, if in linux might be located in:

/etc/spark/conf/spark-defaults.conf

under the lines

spark.driver.memory
spark.executor.memory

But, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point.

When lauching spark master/slave you shoul add the memory set arguments:

ex: ./sbin/start-master.sh --memory 2G

Upvotes: 2

KartikKannapur
KartikKannapur

Reputation: 983

Whether you are using Spark in the standalone mode or the cluster mode, the spark.driver.memory and spark.executor.memory defaults to 1GB of memory. You can add more memory to both the driver and executors by changing this configuration while starting your Jupyter notebook or in the Spark Conf file. With this you should be able to read the 3.4GB CSV file, provided you have the necessary RAM on your machine.

Upvotes: 5

Related Questions