Bagira
Bagira

Reputation: 2243

Spark streaming: Read column value from data frame for further processing

I am reading data using spark streaming as follows

df = spark.readStream.format("cloudFiles").options(**cloudfile).schema(schema).load(filePath)

and streaming is working as expected. I can see the values coming in with following piece

 from pyspark.sql.functions import input_file_name,count
 filesdf = (df.withColumn("file", input_file_name()).groupBy("file").agg(count("*")))
 display(filesdf)

filesdf dataframe prints the name of file and no. of rows

Next I need to get the filename form dataframe for further processing. How can I do this.

I searched on web and found following:

 filename = filesdf.first()['file']
 print(filename)

but above piece of code gives following error:

Queries with streaming sources must be executed with writeStream.start();

Please suggest how can i read a column from streaming dataframe for further processing.

Upvotes: 0

Views: 903

Answers (1)

Bagira
Bagira

Reputation: 2243

I managed to solve the issue. Problem was I was trying to work with dataframe named filesdf rather I should have worked with original df which I got from streaming. When used that a command as simple as following worked for me, so save entire dataframe to a table

df.writeStream.outputMode("append").toTable("members")

With this I am able to write the dataframe contents to a table named members.

Upvotes: 0

Related Questions