Timothy
Timothy

Reputation: 1035

Kafka stream groupBy based on timestamp

I use kafka for voting app, where user can choose candidate and change the selection during 1 hour time-range.

Since this is suitable for KTable, I use kafka stream app. However, there is time-range requirement, means I need to groupBy().count() only for specific time-range (e.g. from 10:00-11:00).

How can I achieve this using Kafka Stream API?
As far as I know, Kafka (I use Kafka 2.3) put published timestamp on metadata, but how to access it? I'm thinking of using .filter() based on timestamp

Also I see windowing documentation but it seems the time is relative (e.g. last 1 hour) instead of fixed (10:00-11:00).

Thank you

Upvotes: 1

Views: 3371

Answers (2)

bbejeck
bbejeck

Reputation: 1360

Timothy,

To access the timestamp of the record, you can use a transformValues() operation. The ValuesTransformer you supply has access to the ProcessorContext and you can call ProcessorContex.timestamp() in the ValueTransformer.transform() method. If the timestamp is within the desired range, return the record otherwise return null. Then add a filter() after the transformValues() to remove the records you've rejected.

Here's an example I think will work

class GroupByTimestampExample {

  public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();
    // You need to update the the time fields these are just placeholders
    long earliest = Instant.now().toEpochMilli();
    long latest = Instant.now().toEpochMilli() + (60 * 60 * 1000);

    final ValueTransformerSupplier<String, String> valueTransformerSupplier = new TimeFilteringTransformer(earliest, latest);

    final KTable<String, Long> voteTable = builder.<String, String>stream("topic")
                                            .transformValues(valueTransformerSupplier)
                                            .filter((k, v) -> v != null)
                                            .groupByKey()
                                            .count();

  }




  static final class TimeFilteringTransformer implements ValueTransformerSupplier<String, String> {

    private final long earliest;
    private final long latest;

    public TimeFilteringTransformer(final long earliest, final long latest) {
      this.earliest = earliest;
      this.latest = latest;
    }

    @Override
    public ValueTransformer<String, String> get() {
      return new ValueTransformer<String, String>() {
        private ProcessorContext processorContext;

        @Override
        public void init(ProcessorContext context) {
          processorContext = context;
        }

        @Override
        public String transform(String value) {
         long ts = processorContext.timestamp();
         if (ts >= earliest && ts <= latest) {
            return value;
         }
         return null;
        }

        @Override
        public void close() {

        }
      };
    }
  }
}

Let me know how it goes.

Upvotes: 2

Tuyen Luong
Tuyen Luong

Reputation: 1366

Actually Tumbling window is Fixed-size, non-overlapping, gap-less windows. In your use case the window duration is one hour, and as your example, a window 10:00-11:00 will be created (start inclusive, end exclusive):

kStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)))
    .count();

Upvotes: 1

Related Questions