Reputation: 2957
I'm working with Spark 2.4.7 to ingest some Avro records from Kafka. Unfortunately, it is possible that at times, malformed data comes through and this causes my job to fail. I'm using the Scala API.
Here is some sample code:
val data = spark.readStream.format("kafka")
.options(config.kafkaOptions ++ Map("subscribe" -> inputTopic))
.option("mode", "PERMISSIVE")
.load()
---
val decodedColumn = org.apache.spark.sql.avro.from_avro($"value", schema) as "value"
---
val q = data
.writeStream
.option("checkpointLocation", checkpointLocation)
.trigger(streaming.Trigger.ProcessingTime(outputTriggerTime))
.foreachBatch { (df, id) => {
val dataFrame : DataFrame = df
.filter(
row => {
if (row.schema.equals(configuredStructTypeSchema))
true
else
false
}
)
.select(decodedColumn)
val outputData = spark.createDataFrame(dataFrame.rdd, dataFrame.schema)
.select("value.*")
writer.write(outputData)
}}.start
The filter clause was something I added to try handle bad data. For my testing, I am sending some gibberish text as input instead of a proper avro record. I get this below error with and without the filter clause -
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: from_avro(value) AS
value
)
How would I ensure that my code only processes incoming Avro data that adheres to a configured schema, and discards everything else?
Upvotes: 0
Views: 83
Reputation: 1152
From the documentation your filter is not likely to work currently since the dataframe row will always have the schema below:
Column Type
key (optional) string or binary
value (required) string or binary
headers (optional) array
topic (*optional) string
partition (optional) int
What you are looking for is the avro schema of the value
column, witch is most likely the avro message stored as binary. One way would be to try/catch deserialysing the value with your expected avro schema within the filter.
Upvotes: 0