kaushik H S
kaushik H S

Reputation: 139

do not want string as type when using foreach in scala spark streaming?

code snippet :

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
write2hdfs.foreachRDD(rdd => {

rdd.foreach(avroRecord => {
println(avroRecord)
//val rawByte = avroRecord.getBytes("UTF-8")

Issue faced>

avroRecord holds avro encoded messages received from kafka stream. By default avroRecord is a string when the above code is being used. And string has UTF-16 encoding as default in scala.

Due this deserialization is not correct and facing issues. Messages were encoded into avro with utf-8 when sent to kafka stream.

I would need avroRecord to be pure bytes instead of getting as string and then converting to bytes(internally string would do utf-16 encoding).

or a way to get avroRecord itself in utf-8. Stuck here deadblock.

Need a way forward for this problem statement.

Thanks in advance.

UPDATE:

Code snippet changed >

val ssc = new StreamingContext(sparkConf, Seconds(5))
//val ssc = new JavaStreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String]("zookeeper.connect" -> 
zkQuorum,"group.id" -> group,"zookeeper.connection.timeout.ms" -> "10000")                    

//val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topics,StorageLevel.NONE)

imports done :

import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord, 
GenericDatumWriter, GenericData}
import org.apache.avro.io.{DecoderFactory, DatumReader, DatumWriter, 
BinaryDecoder}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import java.io.{File, IOException}
//import java.io.*
import org.apache.commons.io.IOUtils;
import _root_.kafka.serializer.{StringDecoder, DefaultDecoder}
import _root_.kafka.message.Message
import scala.reflect._

Compilation error :

Compiling 1 Scala source to /home/spark_scala/spark_stream_project/target/scala-2.10/classes... [error] /home/spark_scala/spark_stream_project/src/main/scala/sparkStreaming.scala:34: overloaded method value createStream with alternatives: [error] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass: Class[String],valueTypeClass: Class[kafka.message.Message],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,kafka.message.Message] [error] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(implicit evidence$1: scala.reflect.ClassTag[String], implicit evidence$2: scala.reflect.ClassTag[kafka.message.Message], implicit evidence$3: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$4: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, kafka.message.Message)] [error] cannot be applied to (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,String], String, org.apache.spark.storage.StorageLevel) [error] val lines = KafkaUtils.createStreamString,Message,StringDecoder,DefaultDecoder [error] ^

[error] one error found

What is wrong here. Also, i dont see the correct constructor as suggested being defined in the kafkaUtils API doc. API Doc ref am referring : https://spark.apache.org/docs/1.3.0/api/java/index.html? org/apache/spark/streaming/kafka/KafkaUtils.html

looking forward for support.

Thanks.

UPDATE 2:

Tried with corrections suggested!

code snippet>

val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)

Facing runtime exception>

java.lang.ClassCastException: [B cannot be cast to kafka.message.Message

On line :
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)

ideally filter this Dstream(String,Message) should also work right ? Do i need to extract the payload from Message before subjecting to map ?

need inputs please. Thanks

Upvotes: 1

Views: 733

Answers (2)

kaushik H S
kaushik H S

Reputation: 139

This worked for me :

val lines = 
KafkaUtils.createStream[String,Array[Byte],StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)

My requirement was to get the Byte Array, so changed to Array[Byte] instead of kafka.message.Message

Upvotes: 0

Joe K
Joe K

Reputation: 18424

You could do something like this:

import kafka.serializer.{StringDecoder, DefaultDecoder}
import kafka.message.Message

val kafkaParams = Map[String, String](
    "zookeeper.connect" -> zkQuorum, "group.id" -> group,
    "zookeeper.connection.timeout.ms" -> "10000")
val lines = KafkaUtils.createStream[String, Message, StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topics, storageLevel)

This should get you a DStream[(String, kafka.message.Message)], and you should be able to retrieve the raw bytes and convert to Avro from there.

Upvotes: 0

Related Questions