Reputation: 31515
For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.
The natural example is calling some external API that supports batching for efficiency.
How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.
So far I have:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
What I want is:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
In Scala and Akka Streams the function is called grouped
or batch
. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
Upvotes: 10
Views: 15625
Reputation: 79
you could use a queue. something like below,
@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {
public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
super(configuration);
}
@Override
Topology buildTopology() {
KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
// .peek((key, value) -> log.info("message received by stream 0"));
kStream.process(() -> new AbstractProcessor<String, String>() {
final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
final List<String> collection = new ArrayList<>();
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
processQueue();
context().commit();
});
}
@Override
public void process(String key, String value) {
queue.add(value);
if (queue.remainingCapacity() == 0) {
processQueue();
}
}
public void processQueue() {
queue.drainTo(collection);
long count = collection.stream().peek(System.out::println).count();
if (count > 0) {
System.out.println("count is " + count);
collection.clear();
}
}
});
kStream.to("normalTopic1");
return streamsBuilder.build();
}
}
Upvotes: 3
Reputation: 31515
Doesn't seem to exist yet. Watch this space https://issues.apache.org/jira/browse/KAFKA-7432
Upvotes: 6
Reputation: 20820
I suspect, if Kafka stream supports fixed size windows like other tools at the moment.
But there are Time based windows, supported by kafka streams. https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
Instead of number of records, you can define the window size with time.
In your case, Tumbling Time Window can be an option to use. Those are non-overlapping, fixed size time window.
For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....
Upvotes: 0