AbhiK
AbhiK

Reputation: 279

Is it possible to register the dataFrame as a SQL temporary view on spark structured streaming dataframe?

I am reading data from kafka topic using spark structured streaming, I want to run sql queries on this streaming data.

Following is code:-

from pyspark.sql import SparkSession, SQLContext

def process_batch(df, id):
    # here I want to run sql queries on data frame but it is giving error
    # table not found 
    spark = spark_session()
    df.createOrReplaceTempView("x")
    spark.sql("select * from x")

def spark_session():
    spark = SparkSession \
        .builder \
        .appName("Python kafka Spark example") \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
        .getOrCreate()

    return spark

def main():

    spark = spark_session()

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "test") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

    query = df.writeStream.foreachBatch(process_batch).start()


    query.awaitTermination()

error = org.apache.spark.sql.AnalysisException: Table or view not found: x;

Upvotes: 0

Views: 1940

Answers (2)

hritik more
hritik more

Reputation: 1

Instead of temp view can't we use Global temp view.

Upvotes: 0

AbhiK
AbhiK

Reputation: 279

created a new dataframe from an existing batch df and ran sql queries over it , solved problem by this appraoch.

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

def process_batch(df, id):
    df.show()
    df1 = df.collect()

    spark = spark_session()

    schemaString = "key value"

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)

    df2 = spark.createDataFrame(df1, schema)
    df2.createOrReplaceTempView("x")
    spark.sql("SELECT value FROM x limit 2").show()

def spark_session():
    spark = SparkSession \
        .builder \
        .appName("Python kafka Spark example") \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
        .getOrCreate()

    return spark


def main():

    spark = spark_session()

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "test") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

    # query = df \
    #             .writeStream \
    #                     .outputMode("append") \
    #                     .format("console") \
    #                     .start()

    #spark.sql("select * from test").show()

    query = df.writeStream.foreachBatch(process_batch).start()


    query.awaitTermination()


if __name__ == "__main__":
    main()

Upvotes: 1

Related Questions