Jander
Jander

Reputation: 61

Running simple PySpark examples fails

We're beggining with Spark (using PySpark) and we're facing problems in a VMware ESX 5.5 evironment with Ubuntu Server 14.04 LTS virtual machines with Java version "1.8.0_45".

Running a simple sc.parallelize(['2', '4']).collect() results in this:

15/07/28 10:11:42 INFO SparkContext: Starting job: collect at <stdin>:1
15/07/28 10:11:42 INFO DAGScheduler: Got job 0 (collect at <stdin>:1) with 2 output partitions (allowLocal=false)
15/07/28 10:11:42 INFO DAGScheduler: Final stage: ResultStage 0(collect at <stdin>:1)
15/07/28 10:11:42 INFO DAGScheduler: Parents of final stage: List()
15/07/28 10:11:42 INFO DAGScheduler: Missing parents: List()
15/07/28 10:11:42 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:396), which has no missing parents
15/07/28 10:11:42 INFO TaskSchedulerImpl: Cancelling stage 0
15/07/28 10:11:42 INFO DAGScheduler: ResultStage 0 (collect at <stdin>:1) failed in Unknown s
15/07/28 10:11:42 INFO DAGScheduler: Job 0 failed: collect at <stdin>:1, took 0,058933 s
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/spark/python/pyspark/rdd.py", line 745, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/spark/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/spark/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:422)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:884)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Found this issue regarding the same behavior: https://issues.apache.org/jira/browse/SPARK-9089

Any idea of what's happening? Or what can we try?

Upvotes: 2

Views: 1161

Answers (1)

Jander
Jander

Reputation: 61

As stated in the issue:

We've faced this same problem and after digging and having a lot of luck we've found the origin of the issue.

It is caused because snappy-java extracts the native library to java.io.tempdir (/tmp by default) and sets the executable flag to the extracted file. If you are mounting /tmp with the "noexec" option, snappy-java won't be able to set the executable flag and will raise an exception. See the SnappyLoader.java code.

We've fixed the issue by not using the option "noexec" when mounting /tmp.

Sean Owen If you want to reproduce the issue, mount /tmp with "noexec" option or set the java.io.tempdir to a directory mounted with "noexec".

Maybe it would be better that Spark sets the property org.xerial.snappy.tempdir to the value of spark.local.dir, but nothing prevents that spark.local.dir may be mounted as "noexec" also.

Removing noexec from the /tmp mountpoint fixed this issue.

Upvotes: 1

Related Questions