Reputation: 61
I have a kafka stream - say for blogs and a kafka table - say for comments related to those blogs. Key from kafka stream can map to multiple values in Kafka table i.e. one blog can have multiple comments. I want to do a join of these two and create a new object with an array of comment ids. But when I do the join, the stream contains only the last comment id. Is there any documentation or example code which can point me right direction how to achieve this? Basically, is there any documentation elaborating how to do one to many relationship join using Kafka stream and Kafka table?
KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
(blogId, blog) -> blog.getBlogId(),
(blog, comment) -> new EnrichedBlog(blog, comment));
So instead of comment - I need to have an array of comment ids.
Upvotes: 6
Views: 4245
Reputation: 7170
As pointed out in the correct answer of Michal above, a KTable
keyed by blogId
cannot be used to keep track of the blogs in this case since only the latest blog value is retained in such table.
As a suggested optimization to the solution mentioned in his answer, note that keeping an ever growing List in the .aggregate()
can potentially become costly in both data size and time if there are a lot of comments per blog. This is because under the hood, each iteration of that aggregation leads to ever-growing instances of a List
, which is ok in java or scala because of data re-use, but which are each serialized separately to the underlying state-store. Schematically, assuming that some key has say 10 comments, then this expression is called 10 times:
(key, value, agg) -> new KeyValue<>(key, agg.add(value))
each time producing a list of size 1, then 2, then... ... then 10, each serialized independently to the under the hood state store, meaning that 1+2+3+...+10=55
values will be serialized in total (well, maybe there's some optimization s.t. some of those serializations are skipped, I don't know, although the space and time complexity is the same I think).
An alternative, though more complex, approach is to use range scans
in state stores, which makes the data structure look a bit like (partition_key, sort_key)
in key-value stores like DynamoDB, in which we store each comment with a key like (blogId, commentId)
. In that case you would still keyBy()
the comments stream by blogId
, then .transform(...)
it to pass it to the processor API, where you can apply the range scan idea, each time adding (i.e. serializing) one single supplementary comment to the state store instead of a new instance of the whole list.
The one-to-many relationship becomes very visible when we picture a lot instances of (blogId, commentId)
keys, all having the same blogId
and a different commentId
, all stored in the same state store instance in the same physical node, and this whole thing happening in parallel for a lot of blogId
in a lot of nodes.
I put more details about that pattern on my blog: One-to-many Kafka Streams Ktable join, and I put a full working example in github
Upvotes: 1
Reputation: 157
If you are using avro with schema registry, you should write your own aggregator because kafka stream fails to serialize ArrayList.
val kTable = aStream
.groupByKey()
.aggregate(
{
YourAggregator() // initialize aggregator
},
{ _, value, agg ->
agg.add(value) // add value to a list in YourAggregator
agg
}
)
And then join the kTable
with your other stream (bStream
).
bStream
.join(
kTable,
{ b, a ->
// do your value join from a to b
b
}
)
Sorry my snippets are written in Kotlin.
Upvotes: 2
Reputation: 4314
I fail to find a join method with a signature matching that in your code example, but here's what I think is the problem:
KTables are interpreted as a changlog, that is to say, every next message with the same key is interpreted as an update to the record, not as a new record. That is why you are seeing only the last "comment" message for a given key (blog id), the previous values are being overwritten. To overcome this, you'll need to change how you populate your KTable in the first place. What you can do is to add your comment topic as a KStream to your topology and then perform an aggregation that simply builds an array or a list of comments that share the same blog id. That aggregation returns a KTable which you can join your blog KStream with.
Here's a sketch how you can do it to construct a List-valued KTable:
builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(),
(key, value, agg) -> new KeyValue<>(key, agg.add(value)),
yourListSerde);
A list is easier to use in an aggregation than an array, so I suggest you convert it to an array downstream if needed. You will also need to provide a serde implementation for your list, "yourListSerde" in the example above.
Upvotes: 7