kindofdev
kindofdev

Reputation: 101

Kafka Stream timestamp synchronization in KStream/KTable join

Having an inner join KStream/KTable with the following sequence of messages:

table_evt_at_t1 --> stream_evt_at_t2 --> table_evt_at_t3 --> stream_evt_at_t4

the join triggers:

(stream_evt_at_t2, table_evt_at_t1) + (stream_evt_at_t4, table_evt_at_t3)

So far, everything ok. The unexpected result comes up when I reset the stream application (with kafka-streams-application-reset.sh) and replay all the events:

(stream_evt_at_t2, table_evt_at_t3) + (stream_evt_at_t4, table_evt_at_t3)

It seems that Kafka Stream doesn't take into account the timestamps when processing the events. It populates the Ktable and then it processes the KStream getting the last value of the Ktable (table_evt_at_t3) for the two KStream events.

Note that I am using Kafka Streams 2.3.1, a custom TimestampExtractor and the property max.task.idle.ms = 10 * 1000L as [KIP-353][1] suggests

Is this the expected behaviour?

Upvotes: 0

Views: 1356

Answers (1)

user1463359
user1463359

Reputation: 1

  • The first result that join triggers is expected behavior since KStream-KTable joins are not windowed but timestamped
  • The result after a 'reset'/replay is also expected behavior since KTable only keeps latest value for a given key and "table_evt_at_t3"("table_evt_at_t1" is already overwritten) is the last value

Upvotes: 0

Related Questions