Reputation: 1647
I am facing a weird issue here, I am reading Avro
records from kafka and trying to deserialize it and store it into a file. I am able to get the records from Kafka but some how when I try to use a function on the rdd records it refuses to do anything
import java.util.UUID
import io.confluent.kafka.serializers.KafkaAvroDecoder
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream}
object KafkaConsumer extends SparkJobLogging {
var schemaRegistry: SchemaRegistryClient = null
val url="url:8181"
schemaRegistry= new CachedSchemaRegistryClient(url, 1000)
def createKafkaStream(ssc: StreamingContext): DStream[(String,Array[Byte])] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "zk.server:2181",
"" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "smallest",
"bootstrap.servers" -> "bootstrap.server:9092",
"" -> "6000",
"schema.registry.url" ->"registry.url:8181"
val topic = "my.topic"
KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Set(topic))
def processRecord( avroStream: Array[Byte])={
println(AvroDeserializer.toRecord(avroStream, schemaRegistry) )
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("AvroDeserilizer")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
val topicStream = createKafkaStream(ssc)map(_._2)
rdd => if (!rdd.isEmpty()){
rdd.foreach(avroRecords=> processRecord(avroRecords))
object AvroDeserializer extends SparkJobLogging{
def toRecord(buffer: Array[Byte], registry: SchemaRegistryClient): GenericRecord = {
val bb = ByteBuffer.wrap(buffer)
bb.get() // consume MAGIC_BYTE
val schemaId = bb.getInt // consume schemaId
val schema = registry.getByID(schemaId) // consult the Schema Registry
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null), decoder) //null -> as we are not providing any datum
Till statement
everything works fine and I see the exact record counts in the log. However after that nothing works.
When I tired
val record= rdd.first()
it worked but rdd.foreach(avroRecords=> processRecord(avroRecords))
and> processRecord(avroRecords))
doesn't works. It just prints below on every streaming call:
17/05/14 01:01:24 INFO scheduler.DAGScheduler: Job 2 finished: foreach at KafkaConsumer.scala:56, took 42.684999 s
17/05/14 01:01:24 INFO scheduler.JobScheduler: Finished job streaming job 1494738000000 ms.0 from job set of time 1494738000000 ms
17/05/14 01:01:24 INFO scheduler.JobScheduler: Total delay: 84.888 s for time 1494738000000 ms (execution: 84.719 s)
17/05/14 01:01:24 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
17/05/14 01:01:24 INFO scheduler.InputInfoTracker: remove old batch metadata:
17/05/14 01:01:26 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:26 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/05/14 01:01:29 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:29 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
It just prints the last 2 lines in the log till the next streaming context call.
Upvotes: 1
Views: 1377
Reputation: 1647
Although the above method didn't worked for me but I found a different way in confluent documentation. The KafkaAvroDecoder
will communicate with schema registry, get the schema and deserialize the data. So it removes the need of custom deserializer.
import io.confluent.kafka.serializers.KafkaAvroDecoder
val kafkaParams = Map[String, String]("" -> brokers,
"schema.registry.url" -> schemaRegistry,
"key.converter.schema.registry.url" -> schemaRegistry,
"value.converter.schema.registry.url" -> schemaRegistry,
"auto.offset.reset" -> "smallest")
val topicSet = Set(topics)
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)
messages.foreachRDD {
rdd => if (!rdd.isEmpty()){
Dependency jar: kafka-avro-serializer-3.1.1.jar
. This is working perfectly for me right now and I hope this will helpful to someone in future.
Upvotes: 0
Reputation: 73
val topicStream = createKafkaStream(ssc)map(_._2)
rdd => if (!rdd.isEmpty()){
rdd.foreach(avroRecords=> processRecord(avroRecords))
dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently. Some of the common mistakes to avoid are as follows.
DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like dstream.foreachRDD() without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.
Upvotes: 1
Reputation: 4296
Your println
statements are being run on distributed workers not in the current process, so you dont see them. You could try replacing println
verify this.
Ideally you should be turning your DStream[Array[Byte]]
to a DStream[GenericRecord]
and write that to a file, use .saveAsTextFiles
or something. You may want a stream.take()
in there because the stream could be infinite.
Upvotes: 1