Reputation: 1242
I am trying to construct a Kafka Streams operator that takes in a Stream< timestamp, data> and outputs another Stream where the timestamps are sorted in ascending order; the purpose is to deal with streams that have "out of order" entries due to delays in the supplier.
At first, I thought about doing this with time-windowed aggregation, but then I happened upon a solution using a Kafka Processor. I figured I could then say something like:
class SortProcessor implements Processor<timestamp,data> ...
class SortProcessorSupplier ...supplies suitably initialized SortProcessor
KStream<timestamp,data> input_stream = ...sourced from "input_topic"
KStream<timestamp,data> output_stream =
input_stream.process( new SortProcessorSupplier(...parameters...) );
However, this doesn't work because KStream.process returns void
.
So, my question is: How do I "wrap" the Processor so that I can use it as follows:
KStream<timestamp,data> input_stream = ...sourced from "input_topic"
KStream<timestamp,data> output_stream =
new WrappedSortProcessor( input_stream, ...parameters... )
Upvotes: 0
Views: 362
Reputation: 4334
Instead of a Processor
you can use a Transformer
, which is very similar to a Processor
but is better suited to forwarding results on to the stream. You can then invoke it from the stream using the KStream.transform()
method instead of process()
.
Upvotes: 1