yashaswini jayshankar
yashaswini jayshankar

Reputation: 64

delay function in kafka streams

was trying something with the kafka streams code and wanted to add delay or something like threads.sleep() for 1ms after splitting data....I m confused how to do that..can someone help me out in doing that?

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
    .mapValues(value -> value.replace("[",""));
    .mapValues(value -> value.replace("]",""));
    .mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{"))
    .flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

So after .flatmapvalues statement I need to add a thread.sleep() for 1ms so what can be my statement there..?

Upvotes: 1

Views: 2755

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Not sure what you want to achieve, but it seems you want to slow down processing? Than you can just put a sleep into your use code. For this, your lambda expression must call "sleep" before it returns the actual result. As an alternative, you can also add an additional .foreach() or peek() call and sleep there.

Upvotes: 2

Related Questions