Reputation: 433
I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
Thank you very much in advance!
Best regards, Nacho
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res = []
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")\
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")\
.option("path", result_dir) \
.option("checkpointLocation", checkpoint_dir)\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
First sentence.
Second sentence.
Third sentence.
Fourth sentence.
Fifth sentence.
Sixth sentence.
Upvotes: 7
Views: 5374
Reputation: 21
just add this line after the section where you created the dataframe
display(dataframe)
Upvotes: 1
Reputation: 18485
"How can I make this program to output to Console sink and display the results when using Databricks?"
The easiest way is to use display
which Databricks provides. You can use it as shown below:
# Cell 1
rateDf = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load())
# Cell 2
display(rateDf, streamName="rate_stream")
The Console sink does not work in Databricks as you would expect it to work in your IDE or when submitting it to your cluster. Instead, you can use the memory
format and query the data with an %sql
query:
inputUDF.writeStream \
.format("memory") \
.trigger(processingTime = "10 seconds") \
.queryName("inputUDF_console") \
.outputMode("append") \
.start()
In another Databricks Cell you can look into the data by querying the table as given in the queryName
:
%sql select * from inputUDF_console
Upvotes: 4