Reputation: 23
I have a KStream with data from topic to1 like this:
T1-KEY -> {T1}
T2-KEY -> {T2}
and a KTable, constructed as follows:
I am using org.apache.kafka.streams.StreamsBuilder to create the KTable from some topic to2 which looks like this:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
The stream is then being flatmapped and grouped by Key s.t. the resulting KTable looks like this:
T1 -> { ["B1", "B2"] }
At a later time, now the following message occurs in topic to2:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
Now I would expect my KTable to reflect the changes and look like this:
T1 -> { ["B2"] }
but it looks like this:
T1 -> { ["B1", "B2"] }
I noticed, that in my Aggregator<Tx-KEY, Bx, Set<Bx>>
the last argument given is the set ["B1", "B2"]
even though when I peek before aggregating I only get one match "B2"
.
Am I understanding the aggregation wrong or what is happening here?
EDIT
I think I narrowed it down: Apparently the aggregation's Initializer
is only called for the very first time - after that the aggregate always receives the last aggregate
as last argument, e.g.
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
where Set<Bx> aggregate
is []
on the very first call (created via the initializer) but ["B1", "B2"]
for the second call.
Any ideas?
EDIT 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
EDIT 3
I cant only flat map though, as I have to combine multiple Ax elements, e.g.
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
where I then expect some group by like
T1 -> { ["B1", "B2"] }
and in the next iteration, when the message
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
arrives I'd expected
T1 -> { ["B1"] }
..
Upvotes: 1
Views: 1876
Reputation: 4334
Notice how in your aggregator you are only ever adding elements to the aggregate set. With this logic, your set (for a given key) can never shrink. I think you have flattened the stream too much in this case. I suggest you don't flatten it to the point your messages are of the form (Tx-KEY key, Bx value)
but instead so that they always retain their set form: (Tx-KEY key, Set<Bx> value)
. You don't need the aggregation then at all.
To achieve that I suggest you transform the input set
"Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
into
T1 -> { ["B1", "B2"] }
by grouping by the "Rel" field using standard java code (Collections or Streams api) inside the KStream flatmap method call so that you are only ever emitting messages with Set<Bx>
-typed values on the KStream, not Bx
-typed values individually.
Happy to elaborate more if you provide the code for your current flatmap implementation.
Upvotes: 1