Reputation: 23
I'm very new to Kafka Stream API. I have a KStream like this:
KStream<Long,String> joinStream = builder.stream(("output"));
The KStream with records value look like this:
The stream will be updated every 1s. I need to build a Rest API that will be calculated based on the value profit and spotPrice. But I've struggled to get the value of the last record.
Upvotes: 1
Views: 1233
Reputation: 7563
I am assuming that you mean the max
value of the stream when you say the last value
as the values are continuously arriving. Then you can use the reduce
transformation to always update the output stream with the max value.
final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, String> stream = builder.stream("INPUT_TOPIC", Consumed.with(Serdes.Long(), Serdes.String()));
stream
.mapValues(value -> Long.valueOf(value))
.groupByKey()
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long currentMax, Long v) {
return (currentMax > v) ? currentMax : v;
}
})
.toStream().to("OUTPUT_TOPIC");
return builder.build();
And in case that you want to retrive it in a rest api i suggest to take a look at Spring cloud + Kafka streams (https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html) that you can exchange messages to spring web.
Upvotes: 1