mike
mike

Reputation: 859

kafka-streams join produce duplicates

I have two topics:

// photos
{'id': 1, 'user_id': 1, 'url': 'url#1'},
{'id': 2, 'user_id': 2, 'url': 'url#2'},
{'id': 3, 'user_id': 2, 'url': 'url#3'}

// users
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'}

I create map photo by user

KStream<Integer, Photo> photo_by_user = ...

photo_by_user.to("photo_by_user")

Then, I try to join two tables:

KTable<Integer, User> users_table = builder.table("users");
KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user");
KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream();

results.to("results");

result like

{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}
{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}

I see that results are duplicated. Why, and how to avoid it?

Upvotes: 1

Views: 1892

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

You might hit a known bug. On "flush" KTable-KTable join might produce some duplicates. Note, that those duplicates are strictly speaking not incorrect, because the result is an update-stream and updating "A" to "A" does not change the result. It's of course undesired to get those duplicates. Try to disable caching -- without caching, the "flush issues" should not occur.

Upvotes: 6

Related Questions