张学文
张学文

Reputation: 1

How to achieve similar sql:select a, count(distinct(b)) from x group by a with kafka stream

My idea is:

First groupByKey then ip, device is unique, then map [ip, device] ip is the key device is value. GroupByKey again, I think the count value is the number of devices corresponding to ip.

Kafka record is

key value(ip,deviceId)

1 127.0.0.1,aa-bb-cc

2 127.0.0.1,aa-bb-cc

3 127.0.0.1,aa-bb-cc

.....(more, but all value are 127.0.0.1,aa-bb-cc)

I want to get the number of deviceIds owned by ip in the hopping time window.

code:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, String value) {

              return new KeyValue<>(value, key);
         }
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
        @Override
        public String apply(Windowed<String> key, Long value) {
            return key.toString();
        }

    }).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, Long value) {
            String[] keys = key.split(",");
            return new KeyValue<>(keys[0], keys[1]);
        }
    }).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
        @Override
        public String apply(Windowed<String> key, Long value) {
            return key.toString();
        }

    }).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, Long value) {
            return new KeyValue<>(key, "" + value);
        }
    }).to("topic");

The expected result Each time window is

key       value

127.0.0.1@1543495068000/1543495188000 1

127.0.0.1@1543495074000/1543495194000 1

127.0.0.1@1543495080000/1543495200000 1

But my running result is:

127.0.0.1@1543495068000/1543495188000 3

127.0.0.1@1543495074000/1543495194000 4

127.0.0.1@1543495080000/1543495200000 1

Why is that?

I am looking forward to someone helping me.

Upvotes: 0

Views: 696

Answers (1)

Mariusz
Mariusz

Reputation: 13926

There are two windows in your code and this may be the cause of the issues. I'd propose this flow:

records.map((key, value) -> {
      String[] data = value.split(",");
      return KeyValue.pair(data[0], data[1]);
    })
    .groupByKey() // by IP
    .windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60))
    .reduce((device1, device2) -> device1 + "|" + device2)
    .toStream() // stream of lists of devices per IP in window
    .mapValues(devices -> new HashSet<>(Arrays.asList(devices.split("|"))) // set of devices
    .mapValues(set -> set.size().toString())

Resulting KStream is windowed stream of (IP, count(distinct(devices))) (both strings), so you can forward it to other topic. The method assumes that there is one character that doesn't exist in devices names (|), if there is no any you'd need to change serialization method.

Upvotes: 1

Related Questions