Alexis Eggermont
Alexis Eggermont

Reputation: 8155

Add date field to RDD in Spark

I have a pretty simple RDD called STjoin on which I pass a simple function to get the day out of a string representing the date-time.

The code passes lazy evaluation, but if I run the last line (STjoinday.take(5)), I get an error.

def parsedate(x):
    try:
        dt=dateutil.parser.parse(x[1]).date()
    except:
        dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date()

    x.append(dt)    
    return x

STjoinday=STjoin.map(lambda line: parsedate(line))
#STjoinday.take(5)

What is the problem here?

Long error traceback below:

15/04/27 22:14:02 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream
    for obj in iterator:
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched
    for item in iterator:
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeUpToNumLeft
    yield next(iterator)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate
    dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date()
AttributeError: 'module' object has no attribute 'parser'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/04/27 22:14:02 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 79, in <module>
    STjoinday.take(5)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1152, in take
    res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/context.py", line 770, in runJob
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream
    for obj in iterator:
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched
    for item in iterator:
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeUpToNumLeft
    yield next(iterator)
  File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate
    dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date()
AttributeError: 'module' object has no attribute 'parser'

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Upvotes: 0

Views: 1484

Answers (2)

Alexis Eggermont
Alexis Eggermont

Reputation: 8155

As pointed out in other answers and comments, the problem is with the importation of dateutils. I found a way that works, even though I am not sure why the others fail. Instead of the above:

from dateutil.parser import parse as parse_date

then use:

dt=parse_date("01 Jan 1900 00:00:00").date()

Upvotes: 1

ayan guha
ayan guha

Reputation: 1257

Looks like dateutil is not a standard python pkg. You need to distribute it to every worker node. Can you post what happens when you just import dateutil after running python shell? May be you are missing some entry in PYTHONPATH

Upvotes: 0

Related Questions