Reputation: 121
I am working on creating a basic streaming app which reads streaming data from kafka and process the data. Below is the code I am trying in pyspark
spark = SparkSession.builder.appName("testing").getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","b1,b2,b3") \
.option("subscribe", "METRICS") \
.option("includeHeaders", "true")\
.option("truncate", False)\
.load()
df.printSchema()
display(df)
Here for streaming I am using readStream. I am using databricks to run the code. I am able to connect to kafka cluster. But the code gets stuck when I am trying to print the dataframe. Nothing happens. I also tried to print the data on console using below code.
query = df.selectExpr("CAST(value AS STRING)") \
.writeStream \
.queryName("tarana")
.format("console") \
.outputMode("append") \
.start()
query.awaitTermination()
But if in case I try to read data as a batch from kafka cluster. I am able to read and create dataframe out of it.
spark = SparkSession.builder.appName("testing").getOrCreate()
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers","b1,b2,b3") \
.option("subscribe", "METRICS") \
.option("includeHeaders", "true")\
.option("truncate", False)\
.load()
df.printSchema()
display(df)
The above code for batch is running perfectly fine. But for streaming it is not running. Gets stuck.
Any suggestions how can I debug it ow what I can add to make it working.
Upvotes: 0
Views: 528
Reputation: 87259
On Databricks it's recommended to use the display function instead of using console
sink because the console output is going into driver/executor logs, and not visible in notebook. And because you use query.awaitTermination()
it will never finished.
Just do:
display(df.selectExpr("CAST(value AS STRING)"))
Upvotes: 0