Reputation: 13
I am using Kafka+Spark integration where I am sending case class object (Website) and map into spark.
case class Website(id: Int, name: String)
implicit val productSchema = Encoders.product[Website]
val website = Website(1,"lokesh")
EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)
val df:Dataset[Website] = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", "localhost:1244")
.option("startingoffsets", "earliest")
.load()
.select("value")
.as[Website]
I get the error
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '
id
' given input columns: [value];
Upvotes: 1
Views: 433
Reputation: 74739
tl;dr Use proper serialization format, e.g. JSON or Avro.
The following code sends out a textual representation of the Website
case class.
EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)
The following code takes the textual representation as Array[Byte]
:
.select("value")
So you'd be better off casting the value to a string and simply...parse to make out an object, e.g. .select($"value" cast "string")
.
With that, you'd be better off sending out a JSON representation of the website
object that would make parsing so much easier. You could also use a comma-separated "serialization format", but that would require that your website
s do not have any fields with commas.
AnalysisException: cannot resolve '
id
' given input columns: [value]
The above exception says that you want to make an object of type Website
(that is made up of id
and name
fields) from value
which is clearly not possible.
After .select("value")
the only column is value
(obvious) and so .as[Website]
is not possible as there are no "parts" of Website
type to fill out (by name).
Upvotes: 1