Jayachandran P
Jayachandran P

Reputation: 35

Kafka streams foreign key join with one-many relations

There are two kafka topics

A message in news topic can have list of image Ids as follows

{ 

    "id": "news-1",
    "title": "Title news-1",
    "description": " description news-1",
    "author": " Author news-1",
    "imageIds": [
        "images-1",
        "images-2"
    ]
}

A message in images topic looks like below

{

    "id": "image-1",
    "url": "https://www.mypublication.co.uk/image-title-1.jpeg",
    "description": "title-1 description",
    "height": 400,
    "width": 450
}


{

     "id": "image-2",
     "url": "https://www.mypublication.co.uk/image-title-2.jpeg",
     "description": "title-2 description",
     "height": 400,
     "width": 450
 }

I'm trying to join both of these streams to populate a final news message enriched with all the image details.

I tried using groupBy and aggregate as below

 KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
    KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));

KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
      .map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
      .join(images, (newsId, image) -> {
        return new ImageWrapper(newsId, image);
      }, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
            .groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
      .aggregate(NewsImages::new, (newsId, image, newsImages) -> {
        newsImages.setNewsId(newsId);
        newsImages.addImage(image);
        return newsImages;
      }, Materialized.with(Serdes.String(),newsImagesSerde));

newsImagesKTable.toStream().
      to(topics.getNewsImagesTopic());

But as expected above code aggregates all the images for the news

When author publishes news for the first time with two images it goes well and we can see the output below

 "news-1" :
     {
       "newsId":"news-1", 
"images":
    {"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
        "image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
                }

When author re-published the article with only image-3 currently it's outputting all three images (That's what aggregator is) news-1 : [image-1, image-2, image-3]

I'm looking for any other alternative ways to join news & images and override the previous values on re-publishing the news news-1 : [image-3]

Upvotes: 1

Views: 878

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Since Apache Kafka 2.4, FK-joins is a native DSL operator: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-join

For more details checkout https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-join

To make this work, you need to process both inputs as KTables for the join. For the left "news" input, you need to split each original input record into N though, one for each image you want to join, and later, aggregate the final result.

The simples way to "prepare" the left hand side data might be a custom stateful Processor.

builder.stream("news")
       .process(....)
       .toTable()
       .join(imageTable)
       .groupBy(... -> news.id)
       .aggregate(...);

The process() step would be flat-map that transforms each input row, into N outputs, one per image, using <(news.id, image.id),...> as primary key, (and also keep the image.id as field in the value (to be able to extract the image.id as FK for the join (cf https://cwiki.apache.org/confluence/display/KAFKA/KIP-1104%3A+Allow+Foreign+Key+Extraction+from+Both+Key+and+Value+in+KTable+Joins that will avoid the need to have the image-id in the value).

The process() needs to be stateful to be able to produce deletes/tombstones... The state just stores the full input record with news.id a key. For each input record the list is images must be compare to the old list of images, and for each new image an regular output record is produced, and for each missing image a tombstone is produced.

Eg input

{ 

    "id": "news-1",
    "title": "Title news-1",
    "description": " description news-1",
    "author": " Author news-1",
    "imageIds": [
        "images-1",
        "images-2"
    ]
}

produces two output record <(news-1,image-1), ...> and <(news-1,image-2),...>. If we get an update for news-1:

{ 

    "id": "news-1",
    "title": "Title news-1",
    "description": " description news-1",
    "author": " Author news-1",
    "imageIds": [
        "images-1",
        "images-3"
    ]
}

the output will be <(news-1,image-2), null> (delete/tombstone) and <(news-1,image-3),...>.

Upvotes: 0

Related Questions