Anant Majhi
Anant Majhi

Reputation: 1

Kafka Streams API GroupBy behaviour

So I've been trying to aggregate some stream data to a KTable using Kafka stream. My JSON from the topic looks like

{
   "id": "d04a6184-e805-4ceb-9aaf-b2ab0139ee84",
   "person": {
      "id": "d04a6184-e805-4ceb-9aaf-b2ab0139ee84",
      "createdBy": "user",
      "createdDate": "2023-01-01T00:28:58.161Z",
      "name": "person 1",
      "description": "test1"
   }
}....
KStream<Object, String> firstStream = builder.stream("topic-1").mapValues(value -> {
  JSONObject json = new JSONObject(String.valueOf(value));
  JSONObject json2 = new JSONObject(json.getJSONObject("person").toString());
  return json2.toString();
});

I get something like

null{"createdDate":"2023-01-01T00:28:58.161Z","createdBy":"user","name":"person 1","description":"test1","id":"d04a6184-e805-4ceb-9aaf-b2ab0139ee84"}
null{"createdDate":"2023-01-01T00:29:07.862Z","createdBy":"user","name":"person 2","description":"test 2","id":"48d8b895-eb27-4977-9dbc-adb8fbf649d8"}
null{"createdDate":"2023-01-01T00:29:12.261Z","createdBy":"anonymousUser","name":"person 2","description":"test 2 updated","id":"d8b895-eb27-4977-9dbc-adb8fbf649d8"}

I want to group this data in such a way such that person 1 will hold one JSON associated with it person 2 will hold a List of both JSON associated with it

I have checked this Kafka Streams API GroupBy behaviour which describes the same problem but the solution given there doesn't work for me. Do I have to perform any extra operations? Please help

Upvotes: 0

Views: 59

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

In order to groupBy, you need a pairing key. So, use map to extract the name of each person.

Then, as the linked answer says, you need to aggregate after grouping to "combine data per person", across events.

By the way, you should setup the Streams config with JsonSerde for values rather than String Serde in order to reduce the need to manually parse each event.

Upvotes: 1

Related Questions