Reputation: 342
I'm using Spark Structured Streaming as described on this page.
I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer
approach)?
Upvotes: 0
Views: 3067
Reputation: 35219
Spark >= 2.4
You can use from_avro
function from spark-avro
library.
import org.apache.spark.sql.avro._
val schema: String = ???
df.withColumn("value", from_avro($"value", schema))
Spark < 2.4
Define a function which takes Array[Byte]
(serialized object):
import scala.reflect.runtime.universe.TypeTag
def decode[T : TypeTag](bytes: Array[Byte]): T = ???
which will deserialize Avro data and create object, that can be stored in a Dataset
.
Create udf
based on the function.
val decodeUdf = udf(decode _)
Call udf
on value
val df = spark
.readStream
.format("kafka")
...
.load()
df.withColumn("value", decodeUdf($"value"))
Upvotes: 1