Anirban Das
Anirban Das

Reputation: 61

How to count the number of records processed by Apache Flink in a given time window

After defining a time window in flink as follows:

val lines = socket.timeWindowAll(Time.seconds(5))

How can I compute the number of records in that particular window of 5 seconds?

Upvotes: 3

Views: 5730

Answers (2)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The most efficient way to perform a count aggregation is a ReduceFunction. However, reduce has the restriction that input and output type must be identical. So you would have to convert the input to an Int before applying the window:

val socket: DataStream[(String)] = ???

val cnts: DataStream[Int] = socket
  .map(_ => 1)                    // convert to 1
  .timeWindowAll(Time.seconds(5)) // group into 5 second windows
  .reduce( (x, y) => x + y)       // sum 1s to count

Upvotes: 6

You could try this.May be give the solution to you.

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

Upvotes: 2

Related Questions