bajky
bajky

Reputation: 342

Avro format deserialization in Spark structured stream

I'm using Spark Structured Streaming as described on this page.

I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer approach)?

Upvotes: 0

Views: 3067

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35219

Spark >= 2.4

You can use from_avro function from spark-avro library.

import org.apache.spark.sql.avro._

val schema: String = ???
df.withColumn("value", from_avro($"value", schema))

Spark < 2.4

  • Define a function which takes Array[Byte] (serialized object):

    import scala.reflect.runtime.universe.TypeTag
    
    def decode[T : TypeTag](bytes: Array[Byte]): T = ???
    

    which will deserialize Avro data and create object, that can be stored in a Dataset.

  • Create udf based on the function.

    val decodeUdf  = udf(decode _)
    
  • Call udf on value

    val df = spark
      .readStream
      .format("kafka")
      ...
      .load()
    
    df.withColumn("value", decodeUdf($"value"))
    

Upvotes: 1

Related Questions