Reputation: 789
I have a Hive table with Scalar/normal values with a column as JSON in String format. Let's take below list data as an example:
l = [(12, '{"status":"200"}') , (13,'{"data":[{"status":"200","somecol":"300"},{"status":"300","somecol":"400"}]}')]
I want to infer schema of the string field and then want to query the JSON fields. I have referred to the solutions given in this answer:
but below attemps to parse the JSON string to actual JSON fails with errors. Tried inferring schema of JSON using:
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
also tried:
new_df = sqc.read.json(df2.rdd.map(lambda r: r.json))
Both results in Errors like:
ValueError: 'json' is not in list
AttributeError: json
****Below is my code:****
from pyspark.sql.functions import json_tuple,from_json,get_json_object
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import from_json, col, to_json, struct
import json
spark.version
spark = SparkSession.builder.getOrCreate()
sqc = SQLContext(spark)
l = [(12, '{"status":"200"}') , (13,'{"data":[{"status":"200","somecol":"300"},{"status":"300","somecol":"400"}]}')]
df = spark.createDataFrame(l,['pid','response'])
df.toPandas()
df2=df.select('response')
df2.toPandas()
df.printSchema()
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema #Failing
df2 = df.withColumn('json', from_json(col('response'), json_schema))
new_df = sqc.read.json(df.rdd.map(lambda r: r.json)) #Failing
Error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-58-b5b5c342aefa> in <module>
----> 1 json_schema = spark.read.json(df2.rdd.map(lambda row: row.json)).schema
2 #df2 = df.withColumn('json', from_json(col('response'), json_schema))
/usr/local/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding)
284 keyed._bypass_serializer = True
285 jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
--> 286 return self._df(self._jreader.json(jrdd))
287 else:
288 raise TypeError("path can be only string, list or RDD")
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1017.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 28, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
idx = self.__fields__.index(item)
ValueError: 'json' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 277, in func
for x in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-58-b5b5c342aefa>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
raise AttributeError(item)
AttributeError: json
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:405)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
idx = self.__fields__.index(item)
ValueError: 'json' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 277, in func
for x in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-58-b5b5c342aefa>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
raise AttributeError(item)
AttributeError: json
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
new_df = sqc.read.json(df2.rdd.map(lambda r: r.json))
new_df = sqc.read.json(df2.rdd.map(lambda r: r.json))
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-60-f44f1b4c98d9> in <module>
----> 1 new_df = sqc.read.json(df2.rdd.map(lambda r: r.json))
/usr/local/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding)
284 keyed._bypass_serializer = True
285 jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
--> 286 return self._df(self._jreader.json(jrdd))
287 else:
288 raise TypeError("path can be only string, list or RDD")
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1071.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 1 times, most recent failure: Lost task 0.0 in stage 29.0 (TID 29, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
idx = self.__fields__.index(item)
ValueError: 'json' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 277, in func
for x in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-60-f44f1b4c98d9>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
raise AttributeError(item)
AttributeError: json
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:405)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
idx = self.__fields__.index(item)
ValueError: 'json' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 277, in func
for x in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-60-f44f1b4c98d9>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
raise AttributeError(item)
AttributeError: json
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
2
df2.toPandas()
pid response json
0 12 {"status":"200"} (None,)
1 13 {"data":[{"status":"200","somecol":"300"},{"st... (None,)
Upvotes: 4
Views: 5813
Reputation: 31
new_df = sqc.read.json(df2.rdd.map(lambda r: r.json))
The json in r.json is the name of your dataframe column that contains the JSON string
Example:
df.printSchema() root |-- date: string (nullable = true) |-- log: string (nullable = true)
json_schema = spark.read.json(df.rdd.map(lambda row: row.log)).schema
Upvotes: 2
Reputation: 28
I am myself a newbie to Spark. I use a shortcut to parse any JSON from Strings. Instead of trying to Infer the schema using map and lambda, simply take one of your sample JSONs and save on the local dev linux box where spark has access. Linux filesystem or hdfs or S3, doesnt matter.
Then using Pyspark's spark.read.json method, infer the schema. I have found this way to work perfectly.
#try to load JSON schema using json file jsondf= spark.read.json("/home/jovyan/work/diag.json")
jsondf.printSchema()
jsonschema = jsondf.schema
print(jsonschema)
lineleveldtlsdf= spark.read.json("/home/jovyan/work/lineleveldetails.json")
lineleveldtlsdf.printSchema()
#Once we get the Schema of the json, then use below code to create a new parsed json column in a new DF, if we want
df2 = df.withColumn('parsedjson', from_json(col('response'), jsonschema))
df2.toPandas()
susmitaghosh_kol@spark-dev-ce:~/notebooks$ cat diag.json {"diagnosis":[{"hdr_diagnosiscode":"2662","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":6,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"78469","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":5,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"30000","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":4,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"317","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":3,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"7812","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":2,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"72887","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":1,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"78097","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":1003,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"78097","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":1001,"hdr_poacode":"-97"},{"hdr_diagnosiscode":"4019","hdr_diagnosistypecode":"ICD-9","hdr_diagnosisnumber":7,"hdr_poacode":"-97"}]}
Upvotes: 1
Reputation: 32660
In your example, the 2 JSON strings do not have the same schema so which one is correct? If it's not the same schema in all rows you'll lose some data when parsing.
To parse that column you can first infer the schema from one json string (collect one value and pass it to schema_of_json
). Something like this:
schema = schema_of_json(df.select(col("response")).take(1)[0].response)
df2 = df.withColumn('json', from_json(col('response'), json_schema))
df2.show()
Output: (assuming the correct schema is the one with pid=13
)
+---+----------------------------------------------------------------------------+--------------------------+
|pid|response |json |
+---+----------------------------------------------------------------------------+--------------------------+
|13 |{"data":[{"status":"200","somecol":"300"},{"status":"300","somecol":"400"}]}|[[[300, 200], [400, 300]]]|
|12 |{"status":"200"} |[] |
+---+----------------------------------------------------------------------------+--------------------------+
Upvotes: 2