Reputation: 4445
I'm newbie to pyspark
, last day I drew crime data to map and it worked fine, today I'm facing this issue.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 3.0 failed 1 times, most recent failure: Lost task 5.0 in stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException
this is full traceback
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-a4ce64abb6b1> in <module>()
43 gmap.scatter(t_lat, t_lng, '#8A15DE', size=40, marker=False)
44 gmap.draw('crimefile.html')
---> 45 init()
<ipython-input-2-a4ce64abb6b1> in init()
34
35 gmap = gmplot.GoogleMapPlotter(41.881832, -87.623177, 16).from_geocode('Chicago')
---> 36 lat = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015') .map(lambda x:float(x.Latitude)).collect()
37 lng = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015') .map(lambda x:float(x.Longitude)).collect()
38
/Users/Mubin/Spark/python/pyspark/rdd.pyc in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/Users/Mubin/Spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/Mubin/Spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/Mubin/Spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 3.0 failed 1 times, most recent failure: Lost task 5.0 in stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-2-a4ce64abb6b1>", line 36, in <lambda>
ValueError: could not convert string to float:
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-2-a4ce64abb6b1>", line 36, in <lambda>
ValueError: could not convert string to float:
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Here is my code.
import csv
from StringIO import StringIO
from collections import namedtuple
from gmplot import *
Crime = ''
def loadData():
sc = SparkContext.getOrCreate()
filePath = '/Users/Mubin/SparkData/chicago.csv'
return sc.textFile(filePath)
def parse(row):
global Crime
reader = csv.reader(StringIO(row))
row = reader.next()
return Crime(*row)
def prepareHeaders(header):
return header.replace(' ', '_').replace('/', '_').split(',')
def createNamedTuple(header):
return namedtuple('Crime', header, verbose=False)
def init():
global Crime
chicago = loadData()
headers = chicago.first()
#return chicago.first()
woHeaders = chicago.filter(lambda x:x <> headers)
#prepare headers[remove spaces, slaches] and convert to list.
fields = prepareHeaders(headers)
Crime = createNamedTuple(fields)
#map header to tuples data to access properties as object.
parsedData = woHeaders.map(parse)
#return parsedData.take(1)
#return parsedData.map(lambda x:x.Primary_Type).countByValue()
#return parsedData.filter(lambda x:x.Primary_Type == 'BATTERY').map(lambda x:x.Year).countByValue()
gmap = gmplot.GoogleMapPlotter(41.881832, -87.623177, 16).from_geocode('Chicago')
lat = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')\
.map(lambda x:float(x.Latitude)).collect()
lng = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')\
.map(lambda x:float(x.Longitude)).collect()
t_lat = parsedData.filter(lambda x: x.Primary_Type == 'THEFT' and x.Year == '2015')\
.map(lambda x:float(x.Latitude)).collect()
t_lng = parsedData.filter(lambda x: x.Primary_Type == 'THEFT' and x.Year == '2015')\
.map(lambda x:float(x.Longitude)).collect()
gmap.scatter(lat, lng, '#DE1515', size=40, marker=False)
gmap.scatter(t_lat, t_lng, '#8A15DE', size=40, marker=False)
gmap.draw('crimefile.html')
init()
if I uncomment any return line, it just returns data fine, but I'm unable to draw maps anymore.
Thanks.
Upvotes: 3
Views: 14684
Reputation: 36
The error you're getting is
ValueError: could not convert string to float
My best guess is that you're trying to convert string
values to float
without filtering empty values(I can't find it in your code). So in your code, do something like this for both Latitude
and Longitude
.
lambda x: x.Primary_Type == 'NARCOTICS' and x.Latitude != ''
Hope this will help.
Upvotes: 2