Mysterious Otter
Mysterious Otter

Reputation: 4049

Pyspark reduce will take in some functions, not others

I'm running Python 2.7 on a jupyter notebook. I have the following pyspark RDD object:

file_rdd.collect()

[(u'Jane', 2),
 (u'Jane', 1),
 (u'Pete', 20),
 (u'Tyler', 3),
 (u'Duncan', 4),
 (u'Yuki', 5),
 (u'Duncan', 6),
 (u'Duncan', 4),
 (u'Duncan', 5)]

When I run file_rdd.values().reduce(max), I get 20. However, when I run file_rdd.values().reduce(sum), I get an error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-61-eb9dd2b679d4> in <module>()
----> 1 file_rdd.values().reduce(sum)

/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.pyc in reduce(self, f)
    800             yield reduce(f, iterator, initial)
    801 
--> 802         vals = self.mapPartitions(func).collect()
    803         if vals:
    804             return reduce(f, vals)

/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.pyc in collect(self)
    774         """
    775         with SCCallSiteSync(self.context) as css:
--> 776             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    777         return list(_load_from_socket(port, self._jrdd_deserializer))
    778 

/Users/zacharythomas/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/Users/zacharythomas/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 30.0 failed 1 times, most recent failure: Lost task 1.0 in stage 30.0 (TID 59, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.py", line 800, in func
    yield reduce(f, iterator, initial)
TypeError: 'int' object is not iterable

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.py", line 800, in func
    yield reduce(f, iterator, initial)
TypeError: 'int' object is not iterable

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Why is that? When I run file_rdd.values().reduce(lambda x, y: x+y) I get the desired result 50.

Someone suggested to me this is because sum needs an iterable object, whereas .reduce wants binary operations. That doesn't explain why max works though, right?

Upvotes: 0

Views: 1799

Answers (1)

Mariusz
Mariusz

Reputation: 13936

Actually, "someone" from the last paragraph is quite right ;-)

reduce wants binary operations and max is one of them:

>>> max(1,2)
2

but sum is not:

>>> sum(1,2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'int' object is not iterable

By looking into operator module documentation you can find add binary operator:

>>> from operator import add
>>> add(1,2)
3
>>> file_rdd.values().reduce(add)
50

Upvotes: 1

Related Questions