Andre
Andre

Reputation: 23

Kafka Streams - Aggregation with old state

I have a KStream with data from topic to1 like this:

T1-KEY -> {T1}
T2-KEY -> {T2}

and a KTable, constructed as follows:

I am using org.apache.kafka.streams.StreamsBuilder to create the KTable from some topic to2 which looks like this:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

The stream is then being flatmapped and grouped by Key s.t. the resulting KTable looks like this:

T1 -> { ["B1", "B2"] }

At a later time, now the following message occurs in topic to2:

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

Now I would expect my KTable to reflect the changes and look like this:

T1 -> { ["B2"] }

but it looks like this:

T1 -> { ["B1", "B2"] }

I noticed, that in my Aggregator<Tx-KEY, Bx, Set<Bx>> the last argument given is the set ["B1", "B2"] even though when I peek before aggregating I only get one match "B2".

Am I understanding the aggregation wrong or what is happening here?

EDIT

I think I narrowed it down: Apparently the aggregation's Initializer is only called for the very first time - after that the aggregate always receives the last aggregate as last argument, e.g.

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

where Set<Bx> aggregate is [] on the very first call (created via the initializer) but ["B1", "B2"] for the second call.

Any ideas?

EDIT 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

EDIT 3

I cant only flat map though, as I have to combine multiple Ax elements, e.g.

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

where I then expect some group by like

T1 -> { ["B1", "B2"] }

and in the next iteration, when the message

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

arrives I'd expected

T1 -> { ["B1"] }

..

Upvotes: 1

Views: 1876

Answers (1)

Michal Borowiecki
Michal Borowiecki

Reputation: 4334

Notice how in your aggregator you are only ever adding elements to the aggregate set. With this logic, your set (for a given key) can never shrink. I think you have flattened the stream too much in this case. I suggest you don't flatten it to the point your messages are of the form (Tx-KEY key, Bx value) but instead so that they always retain their set form: (Tx-KEY key, Set<Bx> value). You don't need the aggregation then at all. To achieve that I suggest you transform the input set

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

into

T1 -> { ["B1", "B2"] }

by grouping by the "Rel" field using standard java code (Collections or Streams api) inside the KStream flatmap method call so that you are only ever emitting messages with Set<Bx>-typed values on the KStream, not Bx-typed values individually.

Happy to elaborate more if you provide the code for your current flatmap implementation.

Upvotes: 1

Related Questions