mLC
mLC

Reputation: 693

How to capture incorrect (corrupt) JSON records in (Py)Spark Structured Streaming?

I have a Azure Eventhub, which is streaming data (in JSON format). I read it as a Spark dataframe, parse the incoming "body" with from_json(col("body"), schema) where schema is pre-defined. In code it, looks like:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)

And now = if there is some inconsistency between the incoming JSON's schema and the defined schema (e.g. the source eventhub starts sending data in new format without notice), the from_json() functions will not throw an error = instead, it will put NULL to the fields, which are present in my schema definition but not in the JSONs eventhub sends.

I want to capture this information and log it somewhere (Spark's log4j, Azure Monitor, warning email, ...).

My question is: what is the best way how to achieve this.

Some of my thoughts:

  1. First thing I can think of is to have a UDF, which checks for the NULLs and if there is any problem, it raise an Exception. I believe there it is not possible to send logs to log4j via PySpark, as the "spark" context cannot be initiated within the UDF (on the workers) and one wants to use the default:

    log4jLogger = sc._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger('PySpark Logger')

  2. Second thing I can think of is to use "foreach/foreachBatch" function and put this check logic there.

But I feel both these approaches are like.. like too much custom - I was hoping that Spark has something built-in for these purposes.

Upvotes: 2

Views: 1687

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74759

tl;dr You have to do this check logic yourself using foreach or foreachBatch operators.


It turns out I was mistaken thinking that columnNameOfCorruptRecord option could be an answer. It will not work.

Firstly, it won't work due to this:

case _: BadRecordException => null

And secondly due to this that simply disables any other parsing modes (incl. PERMISSIVE that seems to be used alongside columnNameOfCorruptRecord option):

new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

In other words, your only option is to use the 2nd item in your list, i.e. foreach or foreachBatch and handle corrupted records yourself.

A solution could use from_json while keeping the initial body column. Any record with an incorrect JSON would end up with the result column null and foreach* would catch it, e.g.

def handleCorruptRecords:
  // if json == null the body was corrupt
  // handle it

df_stream_input = (spark
  .readStream
  .format("eventhubs")
  .options(**ehConfInput)
  .load()
  .select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()

Upvotes: 1

Related Questions