erdavila
erdavila

Reputation: 371

How to count items per time window?

I'm trying to use Spark structured streaming to count the number of items from Kafka for each time window with the code below:

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.window

object Counter extends App {
  val dateFormatter = new SimpleDateFormat("HH:mm:ss")
  val spark = ...
  import spark.implicits._

  val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", ...)
    .option("subscribe", ...)
    .load()

  val windowDuration = "5 minutes"
  val counts = df
    .select("value").as[Array[Byte]]
    .map(decodeTimestampFromKafka).toDF("timestamp")
    .select($"timestamp" cast "timestamp")
    .withWatermark("timestamp", windowDuration)
    .groupBy(window($"timestamp", windowDuration, "1 minute"))
    .count()
    .as[((Long, Long), Long)]

  val writer = new ForeachWriter[((Long, Long), Long)] {
    var partitionId: Long = _
    var version: Long = _

    def open(partitionId: Long, version: Long): Boolean = {
      this.partitionId = partitionId
      this.version = version
      true
    }

    def process(record: ((Long, Long), Long)): Unit = {
      val ((start, end), docs) = record
      val startDate = dateFormatter.format(new Date(start))
      val endDate = dateFormatter.format(new Date(end))
      val now = dateFormatter.format(new Date)
      println(s"$now:$this|$partitionId|$version: ($startDate, $endDate) $docs")
    }

    def close(errorOrNull: Throwable): Unit = {}
  }

  val query = counts
    .repartition(1)
    .writeStream
    .outputMode("complete")
    .foreach(writer)
    .start()

  query.awaitTermination()

  def decodeTimestampFromKafka(bytes: Array[Byte]): Long = ...
}

I expected that, once each minute (the slide duration), it would output a single record (since the only aggregation key is the window) with the items count for the last 5 minutes (the window duration). However, it outputs several records 2-3 times per minute, like in this sample:

...
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:20, 22:43:20) 383
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:18, 22:43:19) 435
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:42:33, 22:42:34) 395
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:14, 22:43:14) 435
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:09, 22:43:09) 437
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:19, 22:43:19) 411
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:07, 22:43:07) 400
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:17, 22:43:17) 392
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:37, 22:43:38) 420
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:25, 22:43:25) 395
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:22, 22:43:22) 416
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:00, 22:43:00) 438
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:41, 22:43:41) 426
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:13, 22:44:13) 132
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:02, 22:44:02) 128
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:09, 22:44:09) 120
...

Changing the output mode to append seems to change the behavior, but still far from what I expected.

What is wrong with my assumptions on the way it should work? Given the code above, how should the sample output be interpreted or used?

Upvotes: 2

Views: 1318

Answers (1)

Arnon Rotem-Gal-Oz
Arnon Rotem-Gal-Oz

Reputation: 25939

You are allowing for late events of up to 5 minutes to be counted and update the windows already calculated (withWatermark) see handling late data and watermarking in the Spark guide

Upvotes: 1

Related Questions