WangHeguan
WangHeguan

Reputation: 25

How can I write a Dataframe into Kafka?

My Dataframe df looks like

[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

I creating a Kafka Sink for streaming queries, but I received nothing from kafka. why?

ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

Upvotes: 0

Views: 1605

Answers (1)

Michael Heil
Michael Heil

Reputation: 18495

You will not receive anything from Kafka because based on your code you are trying to select the columns key and value from a Dataframe which has only columns age and name. You need to select those as shown below.

Also, you do not need writeStream if your Dataframe is static. In that case you need to apply write and save.

import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession

object Main extends App {

  val spark = SparkSession.builder()
    .appName("myAppName")
    .master("local[*]")
    .getOrCreate()

  // create DataFrame
  import spark.implicits._
  val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
  df.show(false)

  // +---+-----+
  // |age|name |
  // +---+-----+
  // |3  |Alice|
  // |5  |Bob  |
  // +---+-----+

  // write to Kafka as is with "age" as key and "name" as value
  df.selectExpr("CAST(age AS STRING) as key", "CAST(name AS STRING) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "test-topic")
    .save()

If you want to store your data into a json string you can apply the following"

  // convert columns into json string
  val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
  df2.show(false)

  // +-----+------------------------+
  // |key  |value                   |
  // +-----+------------------------+
  // |Alice|{"age":3,"name":"Alice"}|
  // |Bob  |{"age":5,"name":"Bob"}  |
  // +-----+------------------------+

Upvotes: 2

Related Questions