user2961127
user2961127

Reputation: 1143

Kafka and pyspark program: Unable to determine why dataframe is empty

Below is my first program working with kafka and pyspark. The code seems to run without exceptions, but the output of my query is empty.

I am initiating spark and kafka. Later, in Kafka initiation, I subscribed the topic = "quickstart-events" and from terminal produced messages for this topic. But when I run this code, it gives me blank dataframes.

How do I resolve?

Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()

dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

rawQuery = dsraw \
        .writeStream \
        .queryName("query1")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query1")
raw.show() # empty output

rawQuery = ds \
        .writeStream \
        .queryName("query2")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query2")
raw.show()  # empty output
print("complete")

Output:

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

+---+-----+
|key|value|
+---+-----+
+---+-----+

Upvotes: 2

Views: 667

Answers (1)

Sky
Sky

Reputation: 2609

if you are learning and experimenting with kafka spark streaming then it is fine.

just use:

    while (True):
    time.sleep(5)
    print("queryresult")
    raw.show()  # it will start printing the result

instead of

            raw.show() # it will run only once that's why not printig the result.

DO NOT USE for Production code.

Better to write like:

spark = SparkSession.builder \
    .appName("Spark-Kafka-Integration") \
    .master("local[2]") \
    .getOrCreate()


dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "quickstart-events") \
    .load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

rawQuery = \
    ds \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

rawQuery.awaitTermination()

it will automatically print the result on the console.

Upvotes: 1

Related Questions