Ganesh
Ganesh

Reputation: 61

Unable to find encoder for type stored in a Dataset for streaming mongo db data through Kafka

I want to tail Mongo oplog and stream it through Kafka. So I found debezium Kafka CDC connector which tails the Mongo oplog with their in-build serialisation technique.

Schema registry uses below convertor for the serialization,

'key.converter=io.confluent.connect.avro.AvroConverter' and
'value.converter=io.confluent.connect.avro.AvroConverter'

Below are the library dependencies I'm using in the project

libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "3.1.2"

libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.10.2.0

Below is the streaming code which deserialize Avro data

import org.apache.spark.sql.{Dataset, SparkSession}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

import scala.collection.JavaConverters._

object KafkaStream{
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder
      .master("local")
      .appName("kafka")
      .getOrCreate()
    sparkSession.sparkContext.setLogLevel("ERROR")

    import sparkSession.implicits._

    case class DeserializedFromKafkaRecord(key: String, value: String)

    val schemaRegistryURL = "http://127.0.0.1:8081"

    val topicName = "productCollection.inventory.Product"
    val subjectValueName = topicName + "-value"

    //create RestService object
    val restService = new RestService(schemaRegistryURL)

    //.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

    //Use Avro parsing classes to get Avro Schema
    val parser = new Schema.Parser
    val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

    //key schema is typically just string but you can do the same process for the key as the value
    val keySchemaString = "\"string\""
    val keySchema = parser.parse(keySchemaString)

    //Create a map with the Schema registry url.
    //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
    val props = Map("schema.registry.url" -> schemaRegistryURL)

    //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
    var keyDeserializer: KafkaAvroDeserializer = null
    var valueDeserializer: KafkaAvroDeserializer = null

    //Create structured streaming DF to read from the topic.
    val rawTopicMessageDF = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "127.0.0.1:9092")
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 20)  //remove for prod
      .load()
    rawTopicMessageDF.printSchema()

    //instantiate the SerDe classes if not already, then deserialize!
    val deserializedTopicMessageDS = rawTopicMessageDF.map{
      row =>
        if (keyDeserializer == null) {
          keyDeserializer = new KafkaAvroDeserializer
          keyDeserializer.configure(props.asJava, true)  //isKey = true
        }
        if (valueDeserializer == null) {
          valueDeserializer = new KafkaAvroDeserializer
          valueDeserializer.configure(props.asJava, false) //isKey = false
        }

        //Pass the Avro schema.
        val deserializedKeyString = keyDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("key"), keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
        val deserializedValueJsonString = valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString

        DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
    }

    val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()

Kafka consumer running fine I can see the data tailing from the oplog however when I run above code I'm getting below errors,

Error:(70, 59) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val deserializedTopicMessageDS = rawTopicMessageDF.map{

and

Error:(70, 59) not enough arguments for method map: (implicit evidence$7: org.apache.spark.sql.Encoder[DeserializedFromKafkaRecord])org.apache.spark.sql.Dataset[DeserializedFromKafkaRecord].
Unspecified value parameter evidence$7.
    val deserializedTopicMessageDS = rawTopicMessageDF.map{

Please suggest what I'm missing here.

Thanks in advance.

Upvotes: 0

Views: 618

Answers (1)

norbjd
norbjd

Reputation: 11277

Just declare your case class DeserializedFromKafkaRecord outside of the main method.

I imagine that when the case class is defined inside main, Spark magic with implicit encoders does not work properly, since the case class does not exist before the execution of main method.

The problem can be reproduced with a simpler example (without Kafka) :

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object SimpleTest {

  // declare CaseClass outside of main method
  case class CaseClass(value: Int)

  def main(args: Array[String]): Unit = {

    // when case class is declared here instead
    // of outside main, the program does not compile
    // case class CaseClass(value: Int)

    val sparkSession = SparkSession
      .builder
      .master("local")
      .appName("simpletest")
      .getOrCreate()

    import sparkSession.implicits._

    val df: DataFrame = sparkSession.sparkContext.parallelize(1 to 10).toDF()
    val ds: Dataset[CaseClass] = df.map { row =>
      CaseClass(row.getInt(0))
    }

    ds.show()
  }
}

Upvotes: 1

Related Questions