Nikhil
Nikhil

Reputation: 121

Not able to read data through kafka spark streaming in pyspark

I am working on creating a basic streaming app which reads streaming data from kafka and process the data. Below is the code I am trying in pyspark

spark = SparkSession.builder.appName("testing").getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","b1,b2,b3") \
.option("subscribe", "METRICS") \
.option("includeHeaders", "true")\
.option("truncate", False)\
.load()


df.printSchema()
display(df)

Here for streaming I am using readStream. I am using databricks to run the code. I am able to connect to kafka cluster. But the code gets stuck when I am trying to print the dataframe. Nothing happens. I also tried to print the data on console using below code.

query = df.selectExpr("CAST(value AS STRING)") \
.writeStream \
.queryName("tarana")
.format("console") \
.outputMode("append") \
.start()

 query.awaitTermination()

But if in case I try to read data as a batch from kafka cluster. I am able to read and create dataframe out of it.

spark = SparkSession.builder.appName("testing").getOrCreate()
 df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers","b1,b2,b3") \
  .option("subscribe", "METRICS") \
  .option("includeHeaders", "true")\
  .option("truncate", False)\
  .load()


  df.printSchema()
  display(df)

The above code for batch is running perfectly fine. But for streaming it is not running. Gets stuck.

Any suggestions how can I debug it ow what I can add to make it working.

Upvotes: 0

Views: 528

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

On Databricks it's recommended to use the display function instead of using console sink because the console output is going into driver/executor logs, and not visible in notebook. And because you use query.awaitTermination() it will never finished.

Just do:

display(df.selectExpr("CAST(value AS STRING)"))

Upvotes: 0

Related Questions