Raanan Ikar
Raanan Ikar

Reputation: 57

pySpark Structured Streaming from Kafka does not output to console for debugging

Below is my code. I have tried many different select variations, and yet the app runs, but without showing messages which are being written every second. I have a Spark Streaming example which using pprint() confirms kafka is in fact getting messages every second. The messages in Kafka are JSON formatted, see the schema for the field/column labels:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics


KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"

if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) \
        .add("reading_time", IntegerType()) \
        .add("tractionForce", IntegerType()) \
        .add("vel_latitudinal", IntegerType()) \
        .add("vel_longitudinal", IntegerType()) \
        .add("velocity", IntegerType()) \
        .add("x_pos", IntegerType()) \
        .add("y_pos", IntegerType()) \
        .add("yawrate", IntegerType())


 # Construct a streaming DataFrame that reads from testtopic
    trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


#(from_json(col("value").cast("string"),schema))

    #Q1 =  trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
    #Q2 =  trans_det_d.select("parsed_value*", "timestamp")


    query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()

Upvotes: 4

Views: 2094

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

kafka.bootstrap.servers is the Kafka broker address (default port 9092), not Zookeeper (port 2181)

Also note your starting offsets are the latest, so you must produce data after starting the streaming application.

If you want to see existing topic data, use the earliest offsets.

Upvotes: 5

Related Questions