Duy Đạt Phạm
Duy Đạt Phạm

Reputation: 23

Get the last records of KStream

I'm very new to Kafka Stream API. I have a KStream like this:

    KStream<Long,String> joinStream = builder.stream(("output"));

The KStream with records value look like this: enter image description here

The stream will be updated every 1s. I need to build a Rest API that will be calculated based on the value profit and spotPrice. But I've struggled to get the value of the last record.

Upvotes: 1

Views: 1233

Answers (1)

Felipe
Felipe

Reputation: 7563

I am assuming that you mean the max value of the stream when you say the last value as the values are continuously arriving. Then you can use the reduce transformation to always update the output stream with the max value.

final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, String> stream = builder.stream("INPUT_TOPIC", Consumed.with(Serdes.Long(), Serdes.String()));
stream
      .mapValues(value -> Long.valueOf(value))
      .groupByKey()
      .reduce(new Reducer<Long>() {
            @Override
            public Long apply(Long currentMax, Long v) {
                  return (currentMax > v) ? currentMax : v;
              }
      })
      .toStream().to("OUTPUT_TOPIC");
return builder.build();

And in case that you want to retrive it in a rest api i suggest to take a look at Spring cloud + Kafka streams (https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html) that you can exchange messages to spring web.

Upvotes: 1

Related Questions