Reputation: 731
I am reading log lines from kafka topic through spark structured streaming,separating the fields of loglines, performing some manipulations on fields and storing storing it in dataframe with separate columns for every fields. I want to write this dataframe to kafka
Below is my sample dataframe and writestream for writing it to kafka
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
Above code gives me below error
Required attribute 'value' not found
I believe this is because I don't have my dataframe in key/value format.How can I write my existing dataframe to kafka in most efficient way?
Upvotes: 1
Views: 1965
Reputation: 6613
The Dataframe being written to Kafka should have the following columns in schema:
In your case there is no value
column and the exception is thrown.
You have to modify it to add at least value column, ex:
import org.apache.spark.sql.functions.{concat, lit}
dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))
For more details you can check: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
Upvotes: 1