
Reputation: 3209

How to write selected columns to Kafka topic?

I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3 and kafka-clients_0.10.0.0

StreamingQuery queryComapanyRecords = 
                .option("topic", "in_topic") 
                .option("auto.create.topics.enable", "false")
                .option("value.serializer", "")
                .option("checkpointLocation", "/app/chkpnt/" )


Giving error :

Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
    at scala.Option.getOrElse(Option.scala:121)

I tried to fix as below, but unable to send the value i.e. which is a java bean in my case.

  StreamingQuery queryComapanyRecords = 
                     comapanyRecords.selectExpr("CAST(company_id AS STRING) AS key", "to_json(struct(\"company_id\",\"fiscal_year\",\"fiscal_quarter\")) AS value")
                    .option("topic", "in_topic")

So is there anyway in java how to handle/send this value( i.e. Java bean as record) ??.

Upvotes: 1

Views: 1421

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74779

Kafka data source requires a specific schema for reading (loading) and writing (saving) datasets.

Quoting the official documentation (highlighting the most important field / column):

Each row in the source has the following schema:


value binary


In other words, you have Kafka records in the value column when reading from a Kafka topic and you have to make your data to save to a Kafka topic available in the value column as well.

In other words, whatever is or is going to be in Kafka is in the value column. The value column is where you "store" business records (the data).

On to your question:

How to write selected columns to Kafka topic?

You should "pack" the selected columns together so they can all together be part of the value column. to_json standard function is a good fit so the selected columns are going to be a JSON message.


Let me give you an example.

Don't forget to start a Spark application or spark-shell with the Kafka data source. Mind the versions of Scala (2.11 or 2.12) and Spark (e.g. 2.4.4).

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4

Let's start by creating a sample dataset. Any multiple-field dataset would work.

val ns = Seq((0, "zero")).toDF("id", "name")
| id|name|
|  0|zero|

If we tried to write the dataset to a Kafka topic, it would error out due to value column missing. That's what you faced initially.

scala> ns.write.format("kafka").option("topic", "in_topic").save
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
  at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery$6(KafkaWriter.scala:71)
  at scala.Option.getOrElse(Option.scala:138)

You have to come up with a way to "pack" multiple fields (columns) together and make it available as value column. struct and to_json standard functions will do it.

val vs = ns.withColumn("value", to_json(struct("id", "name")))
scala> = false)
|id |name|value                 |
|0  |zero|{"id":0,"name":"zero"}|

Saving to a Kafka topic should now be a breeze.

vs.write.format("kafka").option("topic", "in_topic").save

Upvotes: 3

Related Questions