Himanshu Yadav
Himanshu Yadav

Reputation: 13585

Spark Streaming: Writing number of rows read from a Kafka topic

Spark streaming job is reading events from a busy kafka topic. To get a sense of how much data is coming in per trigger interval, I want to just output count of rows read from the topic. I tried multiple ways of doing that but could not figure it out.

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("failOnDataLoss", false)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();
      stream.selectExpr("topic").agg(count("topic")).as("count");
      //stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
      stream.writeStream()
            .format("console")
            .option("truncate", false)
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .start();

Upvotes: 3

Views: 2078

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191874

Looks like you need

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

And then you can print that

Upvotes: 5

Related Questions