Reputation: 1
My idea is:
First groupByKey then ip, device is unique, then map [ip, device] ip is the key device is value. GroupByKey again, I think the count value is the number of devices corresponding to ip.
Kafka record is
key value(ip,deviceId)
1 127.0.0.1,aa-bb-cc
2 127.0.0.1,aa-bb-cc
3 127.0.0.1,aa-bb-cc
.....(more, but all value are 127.0.0.1,aa-bb-cc)
I want to get the number of deviceIds owned by ip in the hopping time window.
code:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, key);
}
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
String[] keys = key.split(",");
return new KeyValue<>(keys[0], keys[1]);
}
}).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
return new KeyValue<>(key, "" + value);
}
}).to("topic");
The expected result Each time window is
key value
127.0.0.1@1543495068000/1543495188000 1
127.0.0.1@1543495074000/1543495194000 1
127.0.0.1@1543495080000/1543495200000 1
But my running result is:
127.0.0.1@1543495068000/1543495188000 3
127.0.0.1@1543495074000/1543495194000 4
127.0.0.1@1543495080000/1543495200000 1
Why is that?
I am looking forward to someone helping me.
Upvotes: 0
Views: 696
Reputation: 13926
There are two windows in your code and this may be the cause of the issues. I'd propose this flow:
records.map((key, value) -> {
String[] data = value.split(",");
return KeyValue.pair(data[0], data[1]);
})
.groupByKey() // by IP
.windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60))
.reduce((device1, device2) -> device1 + "|" + device2)
.toStream() // stream of lists of devices per IP in window
.mapValues(devices -> new HashSet<>(Arrays.asList(devices.split("|"))) // set of devices
.mapValues(set -> set.size().toString())
Resulting KStream is windowed stream of (IP, count(distinct(devices)))
(both strings), so you can forward it to other topic. The method assumes that there is one character that doesn't exist in devices names (|
), if there is no any you'd need to change serialization method.
Upvotes: 1