Reputation: 4055
I'm new to Spark and trying to figure out how the pipe method works. I have the following code in Scala
sc.textFile(hdfsLocation).pipe("preprocess.py").saveAsTextFile(hdfsPreprocessedLocation)
The values hdfsLocation and hdfsPreprocessedLocation are fine. As proof, the following code works from the command line
hadoop fs -cat hdfsLocation/* | ./preprocess.py | head
When I run the above Spark code I get the following errors
14/11/25 09:41:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Cannot run program "preprocess.py": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1041)
at org.apache.spark.rdd.PipedRDD.compute(PipedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:135)
at java.lang.ProcessImpl.start(ProcessImpl.java:130)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022)
... 12 more
In order to solve this for Hadoop streaming I would just use the --files attribute, so I tried the same thing for Spark. I start Spark with the following command
bin/spark-shell --files ./preprocess.py
but that gave the same error.
I couldn't find a good example of using Spark with an external process via pipe, so I'm not sure if I'm doing this correctly. Any help would be greatly appreciated.
Thanks
Upvotes: 4
Views: 7697
Reputation: 4055
I'm not sure if this is the correct answer, so I won't finalize this, but it appears that the file paths are different when running spark in local and cluster mode. When running spark without --master the paths to the pipe command are relative to the local machine. When running spark with --master the paths to the pipe command are ./
UPDATE: This actually isn't correct. I was using SparkFiles.get() to get the file name. It turns out that when calling .pipe() on an RDD the command string is evaluated on the driver and then passed to the worker. Because of this SparkFiles.get() is not the appropriate way to get the file name. The file name should be ./ because SparkContext.addFile() should put that file on ./ relative to to where each worker is run from. But I'm so sour on .pipe now that I've take .pipe out of my code in total in favor of .mapPartitions in combination of a PipeUtils object that I wrote here. This is actually more efficient because I only have to incur the script startup costs once per partition instead of once per example.
Upvotes: 2