ylka
ylka

Reputation: 293

KafkaStreams - joins are not always triggered

I have a Kafkastreams application where I am trying to join transactions to shops.

This is working fine for new events, because shops are always created before transactions happen, but when I am trying to read "historic" data (events that happened before the application was started), sometimes are all transactions joined to shops and sometimes less, around 60%-80%, but once I got 30%.

This is very weird because I know that all transactions have a correct shop id, as sometimes it does join all of them. The events are in one topic and I use filters to put them into two streams. Then I create a KTable from the shops stream and then I join the transaction stream to the shop table.

final KStream<String, JsonNode> shopStream = eventStream
            .filter((key, value) -> "SHOP_CREATED_EVENT".equals(value.path("eventType").textValue()))
            .map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
            );

final KStream<String, JsonNode> transactionStream = eventStream
            .filter((key, value) -> "TRANSACTION_EVENT".equals(value.path("eventType").textValue()))
            .map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
            );

final KTable<String, JsonNode> shopTable = shopStream
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .reduce(genericReducer);

final KStream<String, JsonNode> joinedStream = transactionStream
            .join(shopTable, this::joinToShop, Joined.valueSerde(jsonSerde))

I also tried to use stream to stream join instead of stream to table, same result:

final KStream<String, JsonNode> joinedStream = transactionStream
            .join(shopStream, this::joinToShop,JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), jsonSerde, jsonSerde) )

Finally I write the joinedStream to an output topic:

  joinedStream
            .map((key, value) -> KeyValue.pair(value.path("transactionId").textValue(), value))
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), jsonSerde));

Then I create two keyValue stores to count the number of original transactions and the joined ones:

    Materialized.with(Serdes.String(), Serdes.Long());
    transactionStream
            .map((key, value) -> KeyValue.pair(SOURCE_COUNT_KEY, value))
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .count(as(SOURCE_STORE))
    ;


    joinedStream
            .map((key, value) -> KeyValue.pair(TARGET_COUNT_KEY, value))
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .count(as(TARGET_STORE))
    ;

The filters work because when I output all events in shopStream and transactionStream I can see that all arrive and the joins start only after all events are printed.

I can also see that the shop created event arrives before the transaction events to that shop. Also what is weird that sometimes when I have 10 transactions to the same shop, then 7 is joined correctly and 3 is missing (as an example).

Also the counts in the keyvalue stores are correct because its the same number of events in the output topic. The joinToShop() method is not triggered for the missing joins.

So my question is why is this happening? That sometimes it processes all events and sometimes just a part of them? And how can I make sure that all events are joined?

Upvotes: 0

Views: 835

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Data is processed based on timestamps. However, in older releases, Kafka Streams applies a best effort approach to read data from different topics to based on timestamps.

I would recommend to upgrade to version 2.1 (or newer) that improves timestamp synchronization and should avoid the issue (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization)

Upvotes: 1

Related Questions