Reputation: 13585
Spark streaming job is reading events from a busy kafka topic. To get a sense of how much data is coming in per trigger interval, I want to just output count of rows read from the topic. I tried multiple ways of doing that but could not figure it out.
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
// .option("failOnDataLoss", false)
// .option("maxOffsetsPerTrigger", 10000)
.load();
stream.selectExpr("topic").agg(count("topic")).as("count");
//stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
stream.writeStream()
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start();
Upvotes: 3
Views: 2078
Reputation: 191874
Looks like you need
stream = stream.selectExpr("topic").agg(count("topic")).as("count");
And then you can print that
Upvotes: 5