Mark Lavin
Mark Lavin

Reputation: 1242

How do I combine a Kafka Processor with a Kafka Streams application?

I am trying to construct a Kafka Streams operator that takes in a Stream< timestamp, data> and outputs another Stream where the timestamps are sorted in ascending order; the purpose is to deal with streams that have "out of order" entries due to delays in the supplier.

At first, I thought about doing this with time-windowed aggregation, but then I happened upon a solution using a Kafka Processor. I figured I could then say something like:

class SortProcessor implements Processor<timestamp,data> ...
class SortProcessorSupplier ...supplies suitably initialized SortProcessor
KStream<timestamp,data> input_stream = ...sourced from "input_topic"
KStream<timestamp,data> output_stream =
        input_stream.process( new SortProcessorSupplier(...parameters...) );

However, this doesn't work because KStream.process returns void.

So, my question is: How do I "wrap" the Processor so that I can use it as follows:

KStream<timestamp,data> input_stream = ...sourced from "input_topic"
KStream<timestamp,data> output_stream =
        new WrappedSortProcessor( input_stream, ...parameters... )

Upvotes: 0

Views: 362

Answers (1)

Michal Borowiecki
Michal Borowiecki

Reputation: 4334

Instead of a Processor you can use a Transformer, which is very similar to a Processor but is better suited to forwarding results on to the stream. You can then invoke it from the stream using the KStream.transform() method instead of process().

Upvotes: 1

Related Questions