Reputation: 317
I am working with Spark 2.1 (scala 2.11).
I want to load json formatted strings with a defined schema , from a dataframe into another dataframe . I have tried out some solutions but the least expensive turns out to be the standard column function from_json . I tried out an example(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json) with this function which is giving me unexpected results .
val df = spark.read.text("testFile.txt")
df.show(false)
+----------------+
|value |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record |
+----------------+
df.select(from_json(col("value"),
StructType(List(
StructField("a",IntegerType),
StructField("b",IntegerType)
))
)).show(false)
+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2] |
|null |
+-------------------+
This behavior is similar to mode:PERMISSIVE which is not the default . By default , it is set to FAILFAST mode meaning it should throw an exception whenever the input data & enforced schema are not matching.
I tried the load the testFile.txt with DataFrameReader(JSON DataSource and FAILFAST mode ) and successfully caught an exception.
spark.read.option("mode","FAILFAST").json("test.txt").show(false)
---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---
Though the Parsing Mode is same in both cases , why are respective outputs so different ?
Upvotes: 5
Views: 3636
Reputation: 3618
To add to @user11022201 answer - looks like options
argument can achieve the desired FAILFAST
behavior. The code below is in pyspark and tested with Spark 3.2.2
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType
spark_session = pyspark.sql.SparkSession.builder.master("local[*]").appName("test").getOrCreate()
data = [
{'value': '{"a": 1, "b": 2}'},
{'value': '{bad-record'},
]
df = spark_session.createDataFrame(data)
schema = StructType([
StructField("a", IntegerType()),
StructField("b", IntegerType())
])
# If options are empty then the error does not happen and null values are added to the dataframe
# options = {}
options = {"mode": "FAILFAST"}
parsed_json_df = df.select(F.from_json(F.col("value"), schema, options))
parsed_json_df.show()
The result of the code above is an exception, which is the desired behavior:
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1236)
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
Upvotes: 1
Reputation: 8711
Note that you are reading the file as text file and converting it to json. By default, newline will be delimiter for text files and within a line if you have a valid JSON string, then it will convert correctly with the schema that you define in from_json() method.
If there are blank lines or an invalid JSON text, then you will get NULL.
Check this out:
val df = spark.read.text("in/testFile.txt")
println("Default show()")
df.show(false)
println("Using the from_json method ")
df.select(from_json(col("value"),
StructType(List(
StructField("a",IntegerType),
StructField("b",IntegerType)
))
)).show(false)
when the in/testFile.txt is with below content,
{"a": 1, "b": 2 }
it prints
Default show()
+-----------------+
|value |
+-----------------+
|{"a": 1, "b": 2 }|
+-----------------+
Using the from_json method
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2] |
+--------------------+
when your input is with a blank line
{"a": 1, "b": 2 }
// Blank line
the result is
Default show()
+-----------------+
|value |
+-----------------+
|{"a": 1, "b": 2 }|
| |
+-----------------+
Using the from_json method
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2] |
|null |
+--------------------+
Upvotes: 0
Reputation: 56
That is an expected behavior. from_json
is a SQL function, and there is no concept of exception (intentional one) at this level. If operation fails the result is undefined NULL
.
While from_json
provides options
argument, which allows you to set JSON reader option, this behavior, for the reason mentioned above, cannot be overridden.
On a side note default mode for DataFrameReader
is permissive.
Upvotes: 4