ERJAN
ERJAN

Reputation: 24500

How to suppress stdout 'batch' when streaming spark?

How to change or totally supress this batch metadata and only show my thing?

-------------------------------------------
Batch: 62
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [32]|transaction|        0|335793|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------
Batch: 63
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [33]|transaction|        0|335794|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------

the spark_job.py code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
KAFKA_TOPIC_NAME_CONS = "transaction"
KAFKA_BOOTSTRAP_SERVERS_CONS = '127.0.0.1:9092'
project_path = 'C:/Users/Admin/Desktop/kafka_project'

if __name__ == "__main__":
    print("PySpark Structured Streaming with Kafka Demo Application Started ...")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars",                    "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraLibrary",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.driver.extraClassPath",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    incoming_read = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS)\
        .option("startingOffsets", "earliest")\
        .load()

    query1 = incoming_read.writeStream.format("console").start()
    time.sleep(4)


    query1.awaitTermination()
    incoming_read.awaitTermination()
    
    
    print("PySpark demo app finished.")

the producer.py forever sends numbers 0 to 7 with interval 4 seconds:

# coding=utf8
from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8')
                         )

topic = 'transaction'

while True:
    print('restarting the loop...')
    for i in range(7):
        print('producing for this topic %s this blob: %s ' % (topic,i))

        producer.send(topic, value=i)
        sleep(1)

Also, how to actually see the last line 'pyspark demo app finished'?

Do i need to stop producer and just wait when spark.py timeouts? using spark2.4.6,python3.7

Upvotes: 1

Views: 181

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

Looking at the code of the class ConsoleWrite it is not possible to suppress or change the "Batch" output when using the Console output as it seems to be hard coded.:

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    // We have to print a "Batch" label for the epoch for compatibility 
    // with the pre-data source V2 behavior.
    printRows(messages, schema, s"Batch: $epochId")
  }

  protected def printRows(
      commitMessages: Array[WriterCommitMessage],
      schema: StructType,
      printMessage: String): Unit = {
[...]
    // scalastyle:off println
    println("-------------------------------------------")
    println(printMessage)
    println("-------------------------------------------")
    // scalastyle:off println
[...]
  }

Unless there is a way to suppress println statements.

Also, how to actually see the last line 'pyspark demo app finished'?

You need to terminate your application by either sending a SIGTERM signal or just kill your submitted application. Not sure though if you will be able to see your print statement in the very last line of your code. The producer has nothing to do with this.

Upvotes: 1

Related Questions