How to transform multiple messages into single message?

every day we will get below messages in Student-Topic-In

Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1",  "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1",  "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1",  "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1",  "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1",  "class":"2"}

And the end of the day(once in a day) based on each class, we have to consolidate all the messages and publish to "Student-Topic-Out" in below format.

Message 1:{"Class":"1"
          {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
          {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
       }
Message 2:{"Class":"2" 
          {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
          {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
          {"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
       }

I tried the following, but don't know how to create a student list without class name?

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
    sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)

Could you please let me know, how to construct output JSON message Kafka Streams?

Upvotes: 1

Views: 1535

Answers (2)

Nishu Tayal
Nishu Tayal

Reputation: 20820

You can aggregate the messages in a list in following way :

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
   KTable<Windowed<key>,List<Student>> aggregatedTable =  sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(ArrayList::new, (k, v, list) -> list.add((Student)v, 
               Materialized.with(keySerde(), arrayListSerde())
 )

Once you get the List<Student>, it can be converted to any desired format using .mapValues() function.

Upvotes: 1

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

You can do a KStream.groupBy(...).windowedBy().aggregate().mapValues using the "class" attribute for grouping.

In the Aggregator() you can assemble a List of students that you transform into JSON in mapValues()

Upvotes: 3

Related Questions