prostý člověk
prostý člověk

Reputation: 939

Flink Data Stream Conversion and Expose to REST end point

I have spring boot application and integrating with Apache Flink. I wanted to read data from Kafka system, and expose them to REST end point.

The below is my simple data,

    @GetMapping("/details/{personName}")
    public String getPersonDetails() throws Exception {

        StreamExecutionEnvironment env =  LocalStreamEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "group_id");


        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
                new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String map(String value) throws Exception {
                logger.info(value);
                return value;
            }
        }).print();
        env.execute();

        return "hello world";

    }

My problems is,


"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}

How can i return by converting into JSON by applying filters. For example if my input from REST call is "John" i want to group them and sum the weight values and return as JSON (only Name, and Weight).

NFRS:

I have massive data in Kafka, so wanted to scale and fast processing.

Upvotes: 0

Views: 537

Answers (1)

kkrugler
kkrugler

Reputation: 9245

It sounds like you might need to spend more time reviewing the Flink documentation. But in a nutshell...

  1. Add a MapFunction that parses the string into JSON, extracts the name and weight, and outputs that as a Tuple2<String, Integer> or some custom Java class.
  2. Do a groupBy(name field), followed by a ProcessFunction that sums the weight and saves it in state.
  3. Use QueryableState to expose the state (the summed weights) to code that's running as part of your program's main() method.
  4. In your main method, implement a REST handler that uses the QueryableStateClient to get the weight for a given name.

Upvotes: 2

Related Questions