k_b
k_b

Reputation: 343

Converting Spark-kafka InputDStream to Array[Bytes]

I am using scala & consuming data from Kafka using below Spark Streaming approach:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

Above variable returns InputDStream through which I am able to see data in raw/binary format using below code: println(line)

But I need to apply avro format(schema available) on raw/binary format in order to see data in expected json format. In order to apply avro format, I need to to convert above InputDStream to Array[Bytes] which is used by avro.

Can someone please let me know convert InputDStream to Array[Bytes]?

Or

If you know some better way to apply avro schema on InputDStream(of spark Streaming), please share.

Upvotes: 3

Views: 1412

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

Two things you need to do. The first is use the DefaultDecoder for Kafka which gives you an Array[Byte] for the value type:

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

And then you need to apply your Avro deserialization logic via an additional map:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

Where avroDeserializer is an arbitrary class of yours which knows how to create your type from Avro bytes.

I personally use avro4s to get case class deserialization via macros.

Upvotes: 2

Related Questions