Reputation: 343
I am using scala & consuming data from Kafka using below Spark Streaming approach:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
Above variable returns InputDStream through which I am able to see data in raw/binary format using below code: println(line)
But I need to apply avro format(schema available) on raw/binary format in order to see data in expected json format. In order to apply avro format, I need to to convert above InputDStream to Array[Bytes] which is used by avro.
Can someone please let me know convert InputDStream to Array[Bytes]?
Or
If you know some better way to apply avro schema on InputDStream(of spark Streaming), please share.
Upvotes: 3
Views: 1412
Reputation: 149538
Two things you need to do. The first is use the DefaultDecoder
for Kafka which gives you an Array[Byte]
for the value type:
val lines: DStream[(String, Array[Byte])] =
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
And then you need to apply your Avro deserialization logic via an additional map
:
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
Where avroDeserializer
is an arbitrary class of yours which knows how to create your type from Avro bytes.
I personally use avro4s to get case class deserialization via macros.
Upvotes: 2