Reputation: 139
code snippet :
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
write2hdfs.foreachRDD(rdd => {
rdd.foreach(avroRecord => {
println(avroRecord)
//val rawByte = avroRecord.getBytes("UTF-8")
Issue faced>
avroRecord holds avro encoded messages received from kafka stream. By default avroRecord is a string when the above code is being used. And string has UTF-16 encoding as default in scala.
Due this deserialization is not correct and facing issues. Messages were encoded into avro with utf-8 when sent to kafka stream.
I would need avroRecord to be pure bytes instead of getting as string and then converting to bytes(internally string would do utf-16 encoding).
or a way to get avroRecord itself in utf-8. Stuck here deadblock.
Need a way forward for this problem statement.
Thanks in advance.
UPDATE:
Code snippet changed >
val ssc = new StreamingContext(sparkConf, Seconds(5))
//val ssc = new JavaStreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String]("zookeeper.connect" ->
zkQuorum,"group.id" -> group,"zookeeper.connection.timeout.ms" -> "10000")
//val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines =
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topics,StorageLevel.NONE)
imports done :
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord,
GenericDatumWriter, GenericData}
import org.apache.avro.io.{DecoderFactory, DatumReader, DatumWriter,
BinaryDecoder}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import java.io.{File, IOException}
//import java.io.*
import org.apache.commons.io.IOUtils;
import _root_.kafka.serializer.{StringDecoder, DefaultDecoder}
import _root_.kafka.message.Message
import scala.reflect._
Compilation error :
Compiling 1 Scala source to /home/spark_scala/spark_stream_project/target/scala-2.10/classes... [error] /home/spark_scala/spark_stream_project/src/main/scala/sparkStreaming.scala:34: overloaded method value createStream with alternatives: [error] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass: Class[String],valueTypeClass: Class[kafka.message.Message],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,kafka.message.Message] [error] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(implicit evidence$1: scala.reflect.ClassTag[String], implicit evidence$2: scala.reflect.ClassTag[kafka.message.Message], implicit evidence$3: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$4: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, kafka.message.Message)] [error] cannot be applied to (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,String], String, org.apache.spark.storage.StorageLevel) [error] val lines = KafkaUtils.createStreamString,Message,StringDecoder,DefaultDecoder [error] ^
What is wrong here. Also, i dont see the correct constructor as suggested being defined in the kafkaUtils API doc. API Doc ref am referring : https://spark.apache.org/docs/1.3.0/api/java/index.html? org/apache/spark/streaming/kafka/KafkaUtils.html
looking forward for support.
Thanks.
UPDATE 2:
Tried with corrections suggested!
code snippet>
val lines =
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
Facing runtime exception>
java.lang.ClassCastException: [B cannot be cast to kafka.message.Message
On line :
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
ideally filter this Dstream(String,Message) should also work right ? Do i need to extract the payload from Message before subjecting to map ?
need inputs please. Thanks
Upvotes: 1
Views: 733
Reputation: 139
This worked for me :
val lines =
KafkaUtils.createStream[String,Array[Byte],StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
My requirement was to get the Byte Array, so changed to Array[Byte] instead of kafka.message.Message
Upvotes: 0
Reputation: 18424
You could do something like this:
import kafka.serializer.{StringDecoder, DefaultDecoder}
import kafka.message.Message
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000")
val lines = KafkaUtils.createStream[String, Message, StringDecoder, DefaultDecoder](
ssc, kafkaParams, topics, storageLevel)
This should get you a DStream[(String, kafka.message.Message)]
, and you should be able to retrieve the raw bytes and convert to Avro from there.
Upvotes: 0