Thomas
Thomas

Reputation: 1219

Count messages from kafka topic all hour

I would like to count messages coming from kakfa topic.

For example I have this case class:

case class Message(timestamp: LocalDateTime)

I receive message of this class, and I would like to count how many message I have within in 1 hour. Let suppose message are ordered in this topic (timestamp corresponds to when the message enter in the topic).

I would like to create a case class like this:

case class Counter(datetime: LocalDateTime, count: Int)

Let say I have 100 messages the first hour, then 150 I will have:

Counter("2018-05-17 00:00:00", 100)
Counter("2018-05-17 00:01:00", 150)

Any idea on how to that ? For information I can't/don't want to use kafka-streams.

Thanks!

EDIT:

My source is a kafka topic which I would like to use with the Consumer API. My sink is a postgresql table.

Upvotes: 0

Views: 1981

Answers (2)

Thomas
Thomas

Reputation: 1219

I figured it out a solution with Flink.

I read some documentation about time window in Flink and this page talks about ascending timestamp in a topic (which is is my case).

So here a solution :

  val inputStream: DataStream[Message] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[Message] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).sum(1)

It counts all elements within a tumbling window of 1 minute.

For a more specific solution and arrive at Counter("2018-05-17 00:00:00", 100) we have to extends AllWindowFunction:

  class CustomWindowFunction extends AllWindowFunction[Message, Counter, TimeWindow] {
  def apply(window: TimeWindow, input: Iterable[Message], out: Collector[Counter]): Unit = {
    out.collect(
      Counter(
        new LocalDateTime(window.getStart),
        input.size
      )
    )
  }
}

And then apply it to our timeStream:

  val inputStream: DataStream[MyClass] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[MyClass] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).apply(new CustomWindowFunction())

If in our topic in input we have the Message Class we obtain the Counter Class at the end.

This is the "better" solution I found for now.

Upvotes: 0

Jimeux
Jimeux

Reputation: 3026

The solution you want is usually called windowing in stream processing terms, and most stream processing libraries have this as a feature. There's a good writeup by Software Mill comparing Spark Streaming, Flink, Kafka Streams, and Akka Streams.

You could attempt to implement it yourself, but the libraries mentioned above are all battle tested and have simple, readable APIs. If you don't want to use Kafka Streams, then Akka Streams Kafka mentioned in one of the comments (part of the Alpakka project) would be worth considering.

Upvotes: 1

Related Questions