Reputation: 143
The Streams DSL documentation includes a caveat about using the aggregate
method to transform a KGroupedTable → KTable, as follows (emphasis mine):
When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
My interpretation of that last line implies that one of three things can happen:
Here is the question I'm looking to get answered:
Are all 3 scenarios above actually possible when using the aggregate method on a KGroupedTable?
Or am I misinterpreting the documentation? For my use-case (detailed below), it would be ideal if the subtractor was always be called before the adder.
Why is this question important?
If the adder and subtractor are non-commutative operations and the order in which they are executed can vary, you can end up with different results depending on the order of execution of adder and subtractor. An example of a useful non-commutative operation would be something like if we’re aggregating records into a Set:
.aggregate[Set[Animal]](Set.empty)(
adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)
In this example, for duplicated events, if the adder is called before the subtractor you would end up removing the value entirely from the set (which would be problematic for most use-cases I imagine).
Why am I doubting the documentation (assuming my interpretation of it is correct)?
KTableProcessorSupplier
that calls the user-supplied adder/subtracter functions appears to be this one: https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 and on line 92, you can even see a comment saying "first try to remove the old value". Seems like this would answer my question definitively right? Unfortunately, in my own testing, what I saw was that the process
function itself is called twice; first with a Change<V> value
that includes only the old value and then the process
function is called again with a Change<V> value
that includes only the new value. Unfortunately, I haven't been able to dig deep enough to find the internal code that is generating the old value record and the new value record (upon receiving an update) to determine if it actually produces those records in that order.Upvotes: 4
Views: 1084
Reputation: 62285
Update (since Kafka Streams 3.5):
KIP-904 changes the implementation and improves the behavior.
Kafka Streams 3.4 on older:
The order is hard-coded (ie, no race condition), but there is no guarantee that the order won't change in future releases without notice (ie, it's not a public contract and no KIP is needed to change it). I guess there would be a Jira about it... But as a matter of fact, it does not really matter (detail below).
For the three scenarios you mentioned, the 3rd one cannot happen though: Aggregators are execute in a single thread (per shard) and thus either the adder or subtractor is called first.
first with a Change value that includes only the old value and then the process function is called again with a Change value that includes only the new value.
In general, both records might be processed by different threads and thus it's not possible to send only one record. It's just that the TTD simulates a single threaded execution thus both records always end up in the same processor.
Cf TopologyTestDriver sending incorrect message on KTable aggregations
However, the order actually only matters if both records really end up in the same processor (if the grouping key did not change during the upstream update).
Furthermore, the order actually depends not on the downstream aggregate implementation, but on the order of writes into the repartitions topic of the groupBy()
and with multiple parallel upstream processor, those writes are interleaved anyway. Thus, in general, you should think of the "add" and "subtract" part as independent entities and not make any assumption about their order (also, even if the key did not change, both records might be interleaved by other records...)
The only guarantee provided is (given that you configured the producer correctly to avoid re-ordering during send()), that if the grouping key does not change, the send of the old and new value will not be re-ordered relative to each other. The order of the send is hard-coded in the upstream processor though:
Thus, the order of the downstream aggregate processor is actually meaningless.
Upvotes: 2