Reputation: 855
I am trying to make a local aggregation.
The input topic has records containing multiple elements and I am using flatmap
to split the record into multiple records with another key (here element_id
). This triggers a re-partition as I am applying a grouping for aggregation later in the stream process.
Problem: there are way too many records in this repartition topic and the app cannot handle them (lag is increasing).
Here is a example of the incoming data
key: another ID
value:
{
"cat_1": {
"element_1" : 0,
"element_2" : 1,
"element_3" : 0
},
"cat_2": {
"element_1" : 0,
"element_2" : 1,
"element_3" : 1
}
}
And an example of the wanted aggregation result:
key : element_2
value:
{
"cat_1": 1,
"cat_2": 1
}
So I would like to make a first "local aggregation" and stop splitting incoming records, meaning that I want to aggregate all elements locally (no re-partition) for example in a 30 seconds window, then produce result per element in a topic. A stream consuming this topic later aggregates at a higher level.
I am using Stream DSL, but I am not sure it is enough. I tried to use the process()
and transform()
methods that allow me to benefit from the Processor API, but I don't known how to properly produce some records in a punctuation, or put records in a stream.
How could I achieve that ? Thank you
Upvotes: 2
Views: 531
Reputation: 1418
transform()
returns a KStream
on which you can call to()
to write the results into a topic.
stream.transform(...).to("output_topic");
In a punctuation you can call context.forward()
to send a record downstream. You still need to call to()
to write the forwarded record into a topic.
To implement a custom aggregation consider the following pseudo-ish code:
builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.Integer());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(topic, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(() ->
new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private KeyValueStore<Integer, Integer> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
context.schedule(
Duration.ofMinutes(1),
PunctuationType.STREAM_TIME,
timestamp -> {
// You can get aggregates from the state store here
// Then you can send the aggregates downstream
// with context.forward();
// Alternatively, you can output the aggregate in the
// transform() method as shown below
}
);
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
// Get existing aggregates from the state store with state.get().
// Update aggregates and write them into the state store with state.put().
// Depending on some condition, e.g., 10 seen records,
// output an aggregate downstream by returning the output.
// You can output multiple aggregates by using KStream#flatTransform().
// Alternatively, you can output the aggregate in a
// punctuation as shown above
}
@Override
public void close() {
}
}, stateStoreName)
With this manual aggregation you could implement the higher level aggregation in the same streams app and leverage re-partitioning.
process()
is a terminal operation, i.e., it does not return anything.
Upvotes: 1