observer0107
observer0107

Reputation: 21

Parsing json from incoming datastream to perform simple transformations in Flink

I am using Apache Flink with AWS Kineses for the first time. Basically my objective is to transform incoming data from a Kinesis stream in such a way that I can perform simple transformations such as filtering and aggregating.

I am adding the source using the below:

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

Eventually when I am print the incoming stream I am getting json data in as expected:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();

This is a sample result of the print:

{"event_num": "5530", "timestmap": "2019-03-04 14:29:44.882376", "amount": "80.4", "type": "Purchase"} {"event_num": "5531", "timestmap": "2019-03-04 14:29:44.881379", "amount": "11.98", "type": "Service"}

Can someone enlighten me how I can access these json elements in such a way that I can perform simple transformation such as, selecting only records containing "Service" as a type?

Upvotes: 2

Views: 3856

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

As you are using SimpleStringSchema the resulting stream of events is of type String. Therefore you would need to parse the string first and then you could apply filters etc.

You may want to have a look though at JsonNodeDeserializationSchema, which will produce ObjectNode.

Upvotes: 1

Related Questions