hampi2017
hampi2017

Reputation: 731

Writing streaming dataframe to kafka

I am reading log lines from kafka topic through spark structured streaming,separating the fields of loglines, performing some manipulations on fields and storing storing it in dataframe with separate columns for every fields. I want to write this dataframe to kafka

Below is my sample dataframe and writestream for writing it to kafka

 val dfStructuredWrite = dfProcessedLogs.select(
    dfProcessedLogs("result").getItem("_1").as("col1"),
    dfProcessedLogs("result").getItem("_2").as("col2"),
    dfProcessedLogs("result").getItem("_17").as("col3"))

dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

Above code gives me below error

Required attribute 'value' not found

I believe this is because I don't have my dataframe in key/value format.How can I write my existing dataframe to kafka in most efficient way?

Upvotes: 1

Views: 1965

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6613

The Dataframe being written to Kafka should have the following columns in schema:

  • key (optional) (type: string or binary)
  • value (required) (type: string or binary)
  • topic (optional) (type: string)

In your case there is no value column and the exception is thrown.

You have to modify it to add at least value column, ex:

import org.apache.spark.sql.functions.{concat, lit}

dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))

For more details you can check: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Upvotes: 1

Related Questions