Lokesh Aggarwal
Lokesh Aggarwal

Reputation: 13

How to process Scala case class objects from Kafka using streaming queries?

I am using Kafka+Spark integration where I am sending case class object (Website) and map into spark.

    case class Website(id: Int, name: String)

    implicit val productSchema = Encoders.product[Website]
    val website = Website(1,"lokesh")
    EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)

    val df:Dataset[Website] = spark
          .readStream
          .format("kafka")
          .option("subscribe", topic)
          .option("kafka.bootstrap.servers", "localhost:1244")
          .option("startingoffsets", "earliest")
          .load()
          .select("value") 
          .as[Website]

I get the error

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'id' given input columns: [value];

Upvotes: 1

Views: 433

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74739

tl;dr Use proper serialization format, e.g. JSON or Avro.


The following code sends out a textual representation of the Website case class.

EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)

The following code takes the textual representation as Array[Byte]:

.select("value")

So you'd be better off casting the value to a string and simply...parse to make out an object, e.g. .select($"value" cast "string").

With that, you'd be better off sending out a JSON representation of the website object that would make parsing so much easier. You could also use a comma-separated "serialization format", but that would require that your websites do not have any fields with commas.


AnalysisException: cannot resolve 'id' given input columns: [value]

The above exception says that you want to make an object of type Website (that is made up of id and name fields) from value which is clearly not possible.

After .select("value") the only column is value (obvious) and so .as[Website] is not possible as there are no "parts" of Website type to fill out (by name).

Upvotes: 1

Related Questions