Reputation: 311
I am trying to parse json data in Pyspark's map function. I am interested in the extracting the field "fees":481000 from json data on line #21. If i do this in plain python (that is without pyspark), i can do this with the following and it works!!!
import json
f=open("block_395545.json")
lines = f.read()
json_data = json.loads(lines)
fee_data = json_data["fees"]
print fee_data
But if i put this inside a map function as follows, it does not work :(
function get_tx_fee(line):
json_data = json.loads(line)
fee_data = json_data["fees"]
print fee_data
return fee_data
lines_rdd = sc.textFile("file:///block_395545.json")
tx_fee = lines_rdd.map(get_tx_fee)
Any idea how to fix this?
Here is the code after incorporating @zero323 suggestion
from pyspark import SparkConf, SparkContext
import json
conf = SparkConf().setMaster("local").setAppName("bitcoin_TransactionFee_calcultor")
sc = SparkContext(conf=conf)
content_rdd = (sc.wholeTextFiles("file:///home/ubuntu/unix_practice/spark-example/block_json_395545.txt")
.map(lambda kv: kv[0])
.map(json.loads)
)
content_rdd.take(10)
and here is are the ERRORS after running spark-submit
Traceback (most recent call last):
File "/home/ubuntu/unix_practice/spark-example/bctxfee_text.py", line 48, in <module>
content_rdd.take(10)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1299, in take
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 916, in runJob
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1295, in takeUpToNumLeft
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1295, in takeUpToNumLeft
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
16/02/03 22:42:42 INFO SparkContext: Invoking stop() from shutdown hook
16/02/03 22:42:42 INFO SparkUI: Stopped Spark web UI at http://ec2-52-72-36-43.compute-1.amazonaws.com:4040
16/02/03 22:42:42 INFO DAGScheduler: Stopping DAGScheduler
16/02/03 22:42:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/02/03 22:42:42 INFO MemoryStore: MemoryStore cleared
16/02/03 22:42:42 INFO BlockManager: BlockManager stopped
16/02/03 22:42:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/02/03 22:42:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/02/03 22:42:42 INFO SparkContext: Successfully stopped SparkContext
16/02/03 22:42:42 INFO ShutdownHookManager: Shutdown hook called
16/02/03 22:42:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-8c82aefa-b0f4-4365-b7a9-cdb7468aed68/pyspark-47daf835-b869-48cc-9c3d-b836f2ff26b8
16/02/03 22:42:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-8c82aefa-b0f4-4365-b7a9-cdb7468aed68
Upvotes: 0
Views: 1875
Reputation: 330393
SparkContex.textFile
splits data by line so it won't work with multiline JSON. If files are relatively small you can try SparkContex.wholeTextFiles
instead:
(sc.wholeTextFiles("file:///block_395545.json")
.map(lambda kv: kv[1])
.map(json.loads))
If not you can try to leverage document structure to create custom Hadoop InputFormat
, use custom delimiter or try to fix data after loading using methods which can access multiple records like mapPartitions
/ mapPartitionsWithIndex
.
Upvotes: 1