Reputation: 13
every day we will get below messages in Student-Topic-In
Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1", "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1", "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1", "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1", "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1", "class":"2"}
And the end of the day(once in a day) based on each class, we have to consolidate all the messages and publish to "Student-Topic-Out" in below format.
Message 1:{"Class":"1"
{"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
{"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
}
Message 2:{"Class":"2"
{"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
{"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
{"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
}
I tried the following, but don't know how to create a student list without class name?
KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
sampleStream
.filter((k, v) -> v != null)
.mapValues(v -> (Student) v)
.groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
.windowedBy(TimeWindows.of(5000))
//I am not sure how to create a student list without Classname
.aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)
Could you please let me know, how to construct output JSON message Kafka Streams?
Upvotes: 1
Views: 1535
Reputation: 20820
You can aggregate the messages in a list in following way :
KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
KTable<Windowed<key>,List<Student>> aggregatedTable = sampleStream
.filter((k, v) -> v != null)
.mapValues(v -> (Student) v)
.groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
.windowedBy(TimeWindows.of(5000))
//I am not sure how to create a student list without Classname
.aggregate(ArrayList::new, (k, v, list) -> list.add((Student)v,
Materialized.with(keySerde(), arrayListSerde())
)
Once you get the List<Student>
, it can be converted to any desired format using .mapValues()
function.
Upvotes: 1
Reputation: 62285
You can do a KStream.groupBy(...).windowedBy().aggregate().mapValues
using the "class" attribute for grouping.
In the Aggregator()
you can assemble a List
of students that you transform into JSON
in mapValues()
Upvotes: 3