rockyroad
rockyroad

Reputation: 2957

Skipping malformed Avro records in Spark

I'm working with Spark 2.4.7 to ingest some Avro records from Kafka. Unfortunately, it is possible that at times, malformed data comes through and this causes my job to fail. I'm using the Scala API.

Here is some sample code:

val data = spark.readStream.format("kafka")
  .options(config.kafkaOptions ++ Map("subscribe" -> inputTopic))
  .option("mode", "PERMISSIVE")
  .load()
---
val decodedColumn = org.apache.spark.sql.avro.from_avro($"value", schema) as "value"
---
val q = data
          .writeStream
          .option("checkpointLocation", checkpointLocation)
          .trigger(streaming.Trigger.ProcessingTime(outputTriggerTime))
          .foreachBatch { (df, id) => {
            val dataFrame : DataFrame = df
              .filter(
                row => {
                  if (row.schema.equals(configuredStructTypeSchema))
                    true
                  else
                    false
                }
              )
              .select(decodedColumn)
            val outputData = spark.createDataFrame(dataFrame.rdd, dataFrame.schema)
              .select("value.*")
            writer.write(outputData)
          }}.start    

The filter clause was something I added to try handle bad data. For my testing, I am sending some gibberish text as input instead of a proper avro record. I get this below error with and without the filter clause -

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)

...

Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: from_avro(value) AS value)

How would I ensure that my code only processes incoming Avro data that adheres to a configured schema, and discards everything else?

Upvotes: 0

Views: 83

Answers (1)

parisni
parisni

Reputation: 1152

From the documentation your filter is not likely to work currently since the dataframe row will always have the schema below:

Column  Type
key (optional)  string or binary
value (required)    string or binary
headers (optional)  array
topic (*optional)   string
partition (optional)    int

What you are looking for is the avro schema of the value column, witch is most likely the avro message stored as binary. One way would be to try/catch deserialysing the value with your expected avro schema within the filter.

Upvotes: 0

Related Questions