Abhishek
Abhishek

Reputation: 33

spark pyspark mllib model - when prediction rdd is generated using map, it throws exception on collect()

I am using spark 1.2.0 (cannot upgrade as I dont have control over it). I am using mllib to build a model

points = labels.zip(tfidf).map(lambda t: LabeledPoint(t[0], t[1] ))
train_data, test_data = points.randomSplit([0.6, 0.4], 17)

iterations = 3
model = LogisticRegressionWithSGD.train(train_data, iterations)

labelsAndPreds = test_data.map(lambda p: (p.label, model.predict(p.features)) )
print("labels = "+str(labelsAndPreds.collect()))

When I run this code I get a NullPointerException on collect(). Infact any operation on the predicted data result throws this exception.

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 9.6 KB, free: 529.8 MB)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 68.0 B, free: 529.8 MB)
15/08/26 04:02:43 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.1 in stage 17.0 (TID 27, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.1 in stage 17.0 (TID 27) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 1]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.2 in stage 17.0 (TID 28, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.2 in stage 17.0 (TID 28) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 2]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.3 in stage 17.0 (TID 29) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 3]
15/08/26 04:02:44 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Cancelling stage 17
15/08/26 04:02:44 INFO DAGScheduler: Job 8 failed: collect at /home/a560975/spark-exp/./ml-py-exp-2.py:102, took 0.209401 s
Traceback (most recent call last):
  File "/home/a560975/spark-exp/./ml-py-exp-2.py", line 102, in <module>
    print("labels = "+str(labelsAndPreds.collect()))
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o118.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com
): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)

If instead of doing a test_data.map(lambda p: (p.label, model.predict(p.features)) ) I do the following

for lp in test_data.collect():
    print("predicted = "+str(model.predict(lp.features)))

Then the prediction does not throw any exception, but this is not parallel. Why do I get the exception when I try to do model prediction by map function ? How do I get past it ?

I have tried sc.broadcast(model) to broadcast the model but still I see the same problem. Please help.

Upvotes: 1

Views: 1131

Answers (1)

shouwangcc
shouwangcc

Reputation: 1

If you used Python ,The reason is that “In Python, predict cannot currently be used within an RDD transformation or action. Call predict directly on the RDD instead.”.

Upvotes: 0

Related Questions