samthebest
samthebest

Reputation: 31515

how to process data in chunks/batches with kafka streams?

For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.

The natural example is calling some external API that supports batching for efficiency.

How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.

So far I have:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

What I want is:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

In Scala and Akka Streams the function is called grouped or batch. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall)).

Upvotes: 10

Views: 15625

Answers (3)

Rajesh Rai
Rajesh Rai

Reputation: 79

you could use a queue. something like below,

@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {

    public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
        super(configuration);
    }

    @Override
    Topology buildTopology() {
        KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
        // .peek((key, value) -> log.info("message received by stream 0"));
        kStream.process(() -> new AbstractProcessor<String, String>() {
            final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
            final List<String> collection = new ArrayList<>();

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
                    processQueue();
                    context().commit();
                });
            }

            @Override
            public void process(String key, String value) {
                queue.add(value);
                if (queue.remainingCapacity() == 0) {
                    processQueue();
                }
            }

            public void processQueue() {
                queue.drainTo(collection);
                long count = collection.stream().peek(System.out::println).count();
                if (count > 0) {
                    System.out.println("count is " + count);
                    collection.clear();
                }
            }
        });
        kStream.to("normalTopic1");
        return streamsBuilder.build();
    }

}

Upvotes: 3

samthebest
samthebest

Reputation: 31515

Doesn't seem to exist yet. Watch this space https://issues.apache.org/jira/browse/KAFKA-7432

Upvotes: 6

Nishu Tayal
Nishu Tayal

Reputation: 20820

I suspect, if Kafka stream supports fixed size windows like other tools at the moment.
But there are Time based windows, supported by kafka streams. https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing

Instead of number of records, you can define the window size with time.

  1. Tumbling time windows
  2. Sliding time window
  3. Session window
  4. Hopping time window

In your case, Tumbling Time Window can be an option to use. Those are non-overlapping, fixed size time window.

For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....

Upvotes: 0

Related Questions