gnos
gnos

Reputation: 855

kafka stream make a local aggregation

I am trying to make a local aggregation.

The input topic has records containing multiple elements and I am using flatmap to split the record into multiple records with another key (here element_id). This triggers a re-partition as I am applying a grouping for aggregation later in the stream process. Problem: there are way too many records in this repartition topic and the app cannot handle them (lag is increasing).

Here is a example of the incoming data

key: another ID

value:

{
  "cat_1": {
    "element_1" : 0,
    "element_2" : 1,
    "element_3" : 0
  },
  "cat_2": {
    "element_1" : 0,
    "element_2" : 1,
    "element_3" : 1
  }
}

And an example of the wanted aggregation result: key : element_2 value:

{
  "cat_1": 1,
  "cat_2": 1
}

So I would like to make a first "local aggregation" and stop splitting incoming records, meaning that I want to aggregate all elements locally (no re-partition) for example in a 30 seconds window, then produce result per element in a topic. A stream consuming this topic later aggregates at a higher level.

I am using Stream DSL, but I am not sure it is enough. I tried to use the process() and transform() methods that allow me to benefit from the Processor API, but I don't known how to properly produce some records in a punctuation, or put records in a stream.

How could I achieve that ? Thank you

Upvotes: 2

Views: 531

Answers (1)

Bruno Cadonna
Bruno Cadonna

Reputation: 1418

transform() returns a KStream on which you can call to() to write the results into a topic.

stream.transform(...).to("output_topic");

In a punctuation you can call context.forward() to send a record downstream. You still need to call to() to write the forwarded record into a topic.

To implement a custom aggregation consider the following pseudo-ish code:

builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.Integer(),
                                Serdes.Integer());
builder.addStateStore(keyValueStoreBuilder);

stream = builder.stream(topic, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(() -> 
    new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {

    private KeyValueStore<Integer, Integer> state;

    @Override
    public void init(final ProcessorContext context) {
        state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
        context.schedule(
            Duration.ofMinutes(1), 
            PunctuationType.STREAM_TIME, 
            timestamp -> {
                // You can get aggregates from the state store here 

                // Then you can send the aggregates downstream 
                // with context.forward();

                // Alternatively, you can output the aggregate in the 
                // transform() method as shown below
            }
        );
    }

    @Override
    public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
        // Get existing aggregates from the state store with state.get().

        // Update aggregates and write them into the state store with state.put().

        // Depending on some condition, e.g., 10 seen records, 
        // output an aggregate downstream by returning the output.
        // You can output multiple aggregates by using KStream#flatTransform().

        // Alternatively, you can output the aggregate in a 
        // punctuation as shown above
    }

    @Override
    public void close() {
    }
}, stateStoreName)

With this manual aggregation you could implement the higher level aggregation in the same streams app and leverage re-partitioning.

process() is a terminal operation, i.e., it does not return anything.

Upvotes: 1

Related Questions