Rohini Mathur
Rohini Mathur

Reputation: 441

Discard Bad record and load only good records to dataframe from json file in pyspark

The API generated json file looks like below. The Format of the JSON file is not correct. can we handle the bad records to discard and load only good rows to dataframe using pyspark.

{
"name": "PowerAmplifier",
"Component": "12uF Capacitor\n1/21Resistor\n3 Inductor In Henry\PowerAmplifier\n ",
"url": "https://www.onsemi.com/products/amplifiers-comparators/", 
"image": "https://www.onsemi.com/products/amplifiers-comparators/", 
"ThresholdTime": "48min", 
"MFRDate": "2019-05-08", 
"FallTime": "15Min", 
"description": "PowerAmplifier"
}

Code:

spark = SparkSession.builder \
      .master('local') \
      .appName('Fairchild') \
      .config('spark.executor.memory', '2g') \
      .getOrCreate()

df = sqlContext.read.json ("D:\\Fairchild\\component.json").select("name" , "url", "image","ThresholdTime", "MFRDate", "FallTime", "description")
#df.show()
df.registerTempTable("data")
spark.sql("select * from data").show()

Error:

Traceback (most recent call last):
  File "C:/Users/RAJEE/PycharmProjects/onsemi/test.py", line 24, in <module>
    result = spark.sql("select * from data")
  File "C:\spark\python\pyspark\sql\session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "C:\spark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
  File "C:\spark\python\pyspark\sql\utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'java.lang.RuntimeException: java.lang.RuntimeException:

Please share your suggestion.

Upvotes: 1

Views: 2568

Answers (1)

Salim
Salim

Reputation: 2178

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column. I added 2 json records, 1 good and 1 bad then I can query the dataframe. It needs at least one good json record by default. You can also ignore all malformed records by using option("mode", "DROPMALFORMED").

You can use option("mode", "DROPMALFORMED") in your python code too.

sqlContext.read.option("mode", "DROPMALFORMED").json ("D:\\Fairchild\\component.json").select("name" , "url", "image","ThresholdTime", "MFRDate", "FallTime", "description")

JSON data -

{"name": "PowerAmplifier","Component": "12uF Capacitor\n1/21Resistor\n3 Inductor In Henry\PowerAmplifier\n ","url": "https://www.onsemi.com/products/amplifiers-comparators/", "image": "https://www.onsemi.com/products/mplifiers-comparators/", "ThresholdTime": "48min", "MFRDate": "2019-05-08", "FallTime": "15Min", "description": "PowerAmplifier"}
{"name": "PowerAmplifier","Component": "good record","url": "https://www.onsemi.com/products/amplifiers-comparators/", "image": "https://www.onsemi.com/products/mplifiers-comparators/", "ThresholdTime": "48min", "MFRDate": "2019-05-08", "FallTime": "15Min", "description": "PowerAmplifier"}

Code to read it without ignoring malformed. In this case you can filter bad records using column "_corrupt_record".

    val j = spark.read.json("/Users/msayed2/Documents/temp/test.txt")
    j.show()

Result

+-----------+--------+----------+-------------+--------------------+--------------+--------------------+--------------+--------------------+
|  Component|FallTime|   MFRDate|ThresholdTime|     _corrupt_record|   description|               image|          name|                 url|
+-----------+--------+----------+-------------+--------------------+--------------+--------------------+--------------+--------------------+
|       null|    null|      null|         null|{"name": "PowerAm...|          null|                null|          null|                null|
|good record|   15Min|2019-05-08|        48min|                null|PowerAmplifier|https://www.onsem...|PowerAmplifier|https://www.onsem...|
+-----------+--------+----------+-------------+--------------------+--------------+--------------------+--------------+--------------------+

You can totally ignore malformed like this -

    val j = spark.read.option("mode", "DROPMALFORMED").json("/Users/msayed2/Documents/temp/test.txt")
    j.show()

Result doesn't have malformed

+-----------+--------+----------+-------------+--------------+--------------------+--------------+--------------------+
|  Component|FallTime|   MFRDate|ThresholdTime|   description|               image|          name|                 url|
+-----------+--------+----------+-------------+--------------+--------------------+--------------+--------------------+
|good record|   15Min|2019-05-08|        48min|PowerAmplifier|https://www.onsem...|PowerAmplifier|https://www.onsem...|
+-----------+--------+----------+-------------+--------------+--------------------+--------------+--------------------+

Upvotes: 2

Related Questions