JohnAster
JohnAster

Reputation: 317

Spark from_json No Exception

I am working with Spark 2.1 (scala 2.11).

I want to load json formatted strings with a defined schema , from a dataframe into another dataframe . I have tried out some solutions but the least expensive turns out to be the standard column function from_json . I tried out an example(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json) with this function which is giving me unexpected results .

val df = spark.read.text("testFile.txt")

df.show(false)

+----------------+
|value           |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record     |
+----------------+


df.select(from_json(col("value"),
      StructType(List(
                  StructField("a",IntegerType),
                  StructField("b",IntegerType)
                ))
    )).show(false)


+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2]              |
|null               |
+-------------------+

This behavior is similar to mode:PERMISSIVE which is not the default . By default , it is set to FAILFAST mode meaning it should throw an exception whenever the input data & enforced schema are not matching.

I tried the load the testFile.txt with DataFrameReader(JSON DataSource and FAILFAST mode ) and successfully caught an exception.

spark.read.option("mode","FAILFAST").json("test.txt").show(false)

---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---

Though the Parsing Mode is same in both cases , why are respective outputs so different ?

Upvotes: 5

Views: 3636

Answers (3)

Pavel K
Pavel K

Reputation: 3618

To add to @user11022201 answer - looks like options argument can achieve the desired FAILFAST behavior. The code below is in pyspark and tested with Spark 3.2.2

import pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType

spark_session = pyspark.sql.SparkSession.builder.master("local[*]").appName("test").getOrCreate()

data = [
    {'value': '{"a": 1, "b": 2}'},
    {'value': '{bad-record'},
]

df = spark_session.createDataFrame(data)

schema = StructType([
    StructField("a", IntegerType()),
    StructField("b", IntegerType())
])

# If options are empty then the error does not happen and null values are added to the dataframe
# options = {}
options = {"mode": "FAILFAST"}

parsed_json_df = df.select(F.from_json(F.col("value"), schema, options))
parsed_json_df.show()

The result of the code above is an exception, which is the desired behavior:

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1236)
    at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)

Upvotes: 1

stack0114106
stack0114106

Reputation: 8711

Note that you are reading the file as text file and converting it to json. By default, newline will be delimiter for text files and within a line if you have a valid JSON string, then it will convert correctly with the schema that you define in from_json() method.

If there are blank lines or an invalid JSON text, then you will get NULL.

Check this out:

val df = spark.read.text("in/testFile.txt")
println("Default show()")
df.show(false)

println("Using the from_json method ")
df.select(from_json(col("value"),
  StructType(List(
    StructField("a",IntegerType),
    StructField("b",IntegerType)
  ))
)).show(false)

when the in/testFile.txt is with below content,

{"a": 1, "b": 2 }

it prints

Default show()
+-----------------+
|value            |
+-----------------+
|{"a": 1, "b": 2 }|
+-----------------+

Using the from_json method 
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2]               |
+--------------------+

when your input is with a blank line

{"a": 1, "b": 2 }
// Blank line

the result is

Default show()
+-----------------+
|value            |
+-----------------+
|{"a": 1, "b": 2 }|
|                 |
+-----------------+

Using the from_json method 
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2]               |
|null                |
+--------------------+

Upvotes: 0

user11022201
user11022201

Reputation: 56

That is an expected behavior. from_json is a SQL function, and there is no concept of exception (intentional one) at this level. If operation fails the result is undefined NULL.

While from_json provides options argument, which allows you to set JSON reader option, this behavior, for the reason mentioned above, cannot be overridden.

On a side note default mode for DataFrameReader is permissive.

Upvotes: 4

Related Questions