Khumar
Khumar

Reputation: 336

write pyspark dataframe to kafka

I have pyspark data frame which I wanted to write to kafka topic.

df.show(n=5)
+-------+---------+
|county |category |
+-------+---------+
|Albany1|  Animal3|
|Albany2|  Animal5|
|Albany3|  Animal1|
|Albany4|  Animal2|
|Albany5|  Animal4|
+-------+---------+

df.printSchema()

root
 |-- county: string (nullable = true)
 |-- category: string (nullable = true)

code which I have tried

df.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host:port") \
.option("topic", "test") \
.save()

I am getting below error.

 java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.

Please help. Thank you !!

Upvotes: 2

Views: 257

Answers (1)

Rene B.
Rene B.

Reputation: 7384

I had the same error and could fix it by adding the option .format("console") :

df.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host:port") \
.option("topic", "test") \
.format("console") \
.save()

Upvotes: 1

Related Questions