Reputation: 2243
I am reading data using spark streaming as follows
df = spark.readStream.format("cloudFiles").options(**cloudfile).schema(schema).load(filePath)
and streaming is working as expected. I can see the values coming in with following piece
from pyspark.sql.functions import input_file_name,count
filesdf = (df.withColumn("file", input_file_name()).groupBy("file").agg(count("*")))
display(filesdf)
filesdf dataframe prints the name of file and no. of rows
Next I need to get the filename form dataframe for further processing. How can I do this.
I searched on web and found following:
filename = filesdf.first()['file']
print(filename)
but above piece of code gives following error:
Queries with streaming sources must be executed with writeStream.start();
Please suggest how can i read a column from streaming dataframe for further processing.
Upvotes: 0
Views: 903
Reputation: 2243
I managed to solve the issue. Problem was I was trying to work with dataframe named filesdf rather I should have worked with original df which I got from streaming. When used that a command as simple as following worked for me, so save entire dataframe to a table
df.writeStream.outputMode("append").toTable("members")
With this I am able to write the dataframe contents to a table named members.
Upvotes: 0