Reputation: 939
I have spring boot application and integrating with Apache Flink. I wanted to read data from Kafka system, and expose them to REST end point.
The below is my simple data,
@GetMapping("/details/{personName}")
public String getPersonDetails() throws Exception {
StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
logger.info(value);
return value;
}
}).print();
env.execute();
return "hello world";
}
My problems is,
"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
How can i return by converting into JSON by applying filters. For example if my input from REST call is "John" i want to group them and sum the weight values and return as JSON (only Name, and Weight).
Second problem, I can't stop execute environment. Is there any alternatives? I checked Flink document, i didn't get any for my situation.
Third problem, I wanted to keep in environment is eager loading, tried to using static block but it takes more time also.
NFRS:
I have massive data in Kafka, so wanted to scale and fast processing.
Upvotes: 0
Views: 537
Reputation: 9245
It sounds like you might need to spend more time reviewing the Flink documentation. But in a nutshell...
MapFunction
that parses the string into JSON, extracts the name and weight, and outputs that as a Tuple2<String, Integer> or some custom Java class.ProcessFunction
that sums the weight and saves it in state.QueryableState
to expose the state (the summed weights) to code that's running as part of your program's main() method.QueryableStateClient
to get the weight for a given name.Upvotes: 2