Nacho Castiñeiras
Nacho Castiñeiras

Reputation: 433

How to print out Structured Stream in Console format

I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.

My program:

Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.

Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.

My question is:

Thank you very much in advance!

Best regards, Nacho


My Program: myTest.py

import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------ 
def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res = []

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:      
        # 3.1. We get a string representation of the fileInfo
        file_name = str(item)

        # 3.2. We look for the pattern name= to remove all useless info from the start
        lb_index = file_name.index("name='")
        file_name = file_name[(lb_index + 6):]

        # 3.3. We look for the pattern ') to remove all useless info from the end
        ub_index = file_name.index("',")
        file_name = file_name[:ub_index]

        # 3.4. We append the name to the list
        res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------ 
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
        # 4.1. We copy the file from source_dir to dataset_dir#
        dbutils.fs.cp(source_dir + file, monitoring_dir + file)

        # 4.2. We increase the counter, as we have transferred a new file
        count = count + 1

        # 4.3. We wait the desired transfer_interval until next time slot.
        time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------ 
def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)  

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)    

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')    

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")\
                               .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode  
    if console_sink == False:
        myDSW = inputUDF.writeStream.format("text")\
                                    .option("path", result_dir) \
                                    .option("checkpointLocation", checkpoint_dir)\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   
    # 4.2. Or to display by console in append mode    
    else:
        myDSW = inputUDF.writeStream.format("console")\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()    

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
    my_main()

My Dataset: f1.txt

First sentence.

Second sentence.


My Dataset: f2.txt

Third sentence.

Fourth sentence.


My Dataset: f3.txt

Fifth sentence.

Sixth sentence.

Upvotes: 7

Views: 5374

Answers (2)

Santhosh
Santhosh

Reputation: 21

just add this line after the section where you created the dataframe

display(dataframe)

Upvotes: 1

Michael Heil
Michael Heil

Reputation: 18485

"How can I make this program to output to Console sink and display the results when using Databricks?"

The easiest way is to use display which Databricks provides. You can use it as shown below:

# Cell 1
rateDf = (spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .option("numPartitions", 1)
  .load())

# Cell 2
display(rateDf, streamName="rate_stream")

The Console sink does not work in Databricks as you would expect it to work in your IDE or when submitting it to your cluster. Instead, you can use the memory format and query the data with an %sql query:

inputUDF.writeStream \
  .format("memory") \
  .trigger(processingTime = "10 seconds") \
  .queryName("inputUDF_console") \
  .outputMode("append") \
  .start()  

In another Databricks Cell you can look into the data by querying the table as given in the queryName:

%sql select * from inputUDF_console

Upvotes: 4

Related Questions