Reputation: 3209
I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3 and kafka-clients_0.10.0.0
StreamingQuery queryComapanyRecords =
comapanyRecords
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.option("auto.create.topics.enable", "false")
.option("key.serializer","org.apache.kafka.common.serialization.StringDeserializer")
.option("value.serializer", "com.spgmi.ca.prescore.serde.MessageRecordSerDe")
.option("checkpointLocation", "/app/chkpnt/" )
.outputMode("append")
.start();
queryLinkingMessageRecords.awaitTermination();
Giving error :
Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:121)
I tried to fix as below, but unable to send the value i.e. which is a java bean in my case.
StreamingQuery queryComapanyRecords =
comapanyRecords.selectExpr("CAST(company_id AS STRING) AS key", "to_json(struct(\"company_id\",\"fiscal_year\",\"fiscal_quarter\")) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.start();
So is there anyway in java how to handle/send this value( i.e. Java bean as record) ??.
Upvotes: 1
Views: 1421
Reputation: 74779
Kafka data source requires a specific schema for reading (loading) and writing (saving) datasets.
Quoting the official documentation (highlighting the most important field / column):
Each row in the source has the following schema:
...
value binary
...
In other words, you have Kafka records in the value
column when reading from a Kafka topic and you have to make your data to save to a Kafka topic available in the value
column as well.
In other words, whatever is or is going to be in Kafka is in the value
column. The value
column is where you "store" business records (the data).
On to your question:
How to write selected columns to Kafka topic?
You should "pack" the selected columns together so they can all together be part of the value
column. to_json
standard function is a good fit so the selected columns are going to be a JSON message.
Let me give you an example.
Don't forget to start a Spark application or spark-shell
with the Kafka data source. Mind the versions of Scala (2.11
or 2.12
) and Spark (e.g. 2.4.4
).
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
Let's start by creating a sample dataset. Any multiple-field dataset would work.
val ns = Seq((0, "zero")).toDF("id", "name")
scala> ns.show
+---+----+
| id|name|
+---+----+
| 0|zero|
+---+----+
If we tried to write the dataset to a Kafka topic, it would error out due to value
column missing. That's what you faced initially.
scala> ns.write.format("kafka").option("topic", "in_topic").save
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery$6(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:138)
...
You have to come up with a way to "pack" multiple fields (columns) together and make it available as value
column. struct
and to_json
standard functions will do it.
val vs = ns.withColumn("value", to_json(struct("id", "name")))
scala> vs.show(truncate = false)
+---+----+----------------------+
|id |name|value |
+---+----+----------------------+
|0 |zero|{"id":0,"name":"zero"}|
+---+----+----------------------+
Saving to a Kafka topic should now be a breeze.
vs.write.format("kafka").option("topic", "in_topic").save
Upvotes: 3