Reputation: 57
Below is my code. I have tried many different select variations, and yet the app runs, but without showing messages which are being written every second. I have a Spark Streaming example which using pprint() confirms kafka is in fact getting messages every second. The messages in Kafka are JSON formatted, see the schema for the field/column labels:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics
KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"
if __name__ == "__main__":
print("NXB PySpark Structured Streaming with Kafka Demo Started")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.master("local[*]") \
.config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
schema = StructType() \
.add("WheelAngle", IntegerType()) \
.add("acceleration", IntegerType()) \
.add("heading", IntegerType()) \
.add("reading_time", IntegerType()) \
.add("tractionForce", IntegerType()) \
.add("vel_latitudinal", IntegerType()) \
.add("vel_longitudinal", IntegerType()) \
.add("velocity", IntegerType()) \
.add("x_pos", IntegerType()) \
.add("y_pos", IntegerType()) \
.add("yawrate", IntegerType())
# Construct a streaming DataFrame that reads from testtopic
trans_det_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_SERVER) \
.option("subscribe", KAFKA_TOPIC) \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
#(from_json(col("value").cast("string"),schema))
#Q1 = trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
#Q2 = trans_det_d.select("parsed_value*", "timestamp")
query = trans_det_df.writeStream \
.format("console") \
.option("truncate","false") \
.start() \
.awaitTermination()
Upvotes: 4
Views: 2094
Reputation: 191743
kafka.bootstrap.servers
is the Kafka broker address (default port 9092), not Zookeeper (port 2181)
Also note your starting offsets are the latest, so you must produce data after starting the streaming application.
If you want to see existing topic data, use the earliest offsets.
Upvotes: 5