Reputation: 25
My Dataframe df
looks like
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
I creating a Kafka Sink for streaming queries, but I received nothing from kafka. why?
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
Upvotes: 0
Views: 1605
Reputation: 18495
You will not receive anything from Kafka because based on your code you are trying to select the columns key
and value
from a Dataframe which has only columns age
and name
. You need to select those as shown below.
Also, you do not need writeStream
if your Dataframe is static. In that case you need to apply write
and save
.
import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession
object Main extends App {
val spark = SparkSession.builder()
.appName("myAppName")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
df.show(false)
// +---+-----+
// |age|name |
// +---+-----+
// |3 |Alice|
// |5 |Bob |
// +---+-----+
// write to Kafka as is with "age" as key and "name" as value
df.selectExpr("CAST(age AS STRING) as key", "CAST(name AS STRING) as value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-topic")
.save()
If you want to store your data into a json string you can apply the following"
// convert columns into json string
val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
df2.show(false)
// +-----+------------------------+
// |key |value |
// +-----+------------------------+
// |Alice|{"age":3,"name":"Alice"}|
// |Bob |{"age":5,"name":"Bob"} |
// +-----+------------------------+
Upvotes: 2