Raf
Raf

Reputation: 842

How to reprocess a batched Kafka Stream

For this to work, the Processor API seems more or less fitting (a la KStream batch process windows):

public void process(String key, SensorData sensorData) {
    //epochTime is rounded to our prefered time window (1 minute)
    long epochTime = Sensordata.epochTime;

    ArrayList<SensorData> data = messageStore.get(epochTime);
    if (data == null) {
        data = new ArrayList<SensorData>();
    }

    data.add(sensorData);
    messageStore.put(id, data);
    this.context.commit();
}

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(60000); // equal to 1 minute
}

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
    while (it.hasNext()) {
        KeyValue<String, ArrayList<SensorData>> entry = it.next();
        this.context.forward(entry.key, entry.value);
    }
    //reset messageStore
}

However, this approach has one major downside with : we don't use Kafka Streams windows.

Taking these points into consideration, I should use Kafka Streams windows. But this is only possible in the Kafka Streams DSL...

Any toughts on this would be awesome.

Upvotes: 3

Views: 968

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You can mix-and-match DSL and Processor API using process(), transform(), or transformValues() within DSL (there are some other SO question about this already so I do not elaborate further). Thus, you can use regular window construct in combination with a custom (downstream) operator to hold the result back (and deduplicate). Some duduplication will already happen automatically within you window-operator (as of Kafka 0.10.1; see http://docs.confluent.io/current/streams/developer-guide.html#memory-management) but if you want to have exactly one result the cache will not do it for you.

About punctuate: it is triggered based on progress (ie, stream-time) and not based on wall-clock time -- so if you reprocess old data, if will be called the exact same amount of times as in you original run (just faster after each other if you consider wall-clock time as you process older data faster). There are also some SO question about this if you want to get more details.

However, I general consideration: why do you need exactly one result? If you do stream processing, you might want to build you downstream consumer application to be able to handle updates to your result. This is the inherent design of Kafka: using changelogs.

Upvotes: 2

Related Questions