Reputation: 21
I am using Apache Flink with AWS Kineses for the first time. Basically my objective is to transform incoming data from a Kinesis stream in such a way that I can perform simple transformations such as filtering and aggregating.
I am adding the source using the below:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Eventually when I am print the incoming stream I am getting json data in as expected:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();
This is a sample result of the print:
{"event_num": "5530", "timestmap": "2019-03-04 14:29:44.882376", "amount": "80.4", "type": "Purchase"} {"event_num": "5531", "timestmap": "2019-03-04 14:29:44.881379", "amount": "11.98", "type": "Service"}
Can someone enlighten me how I can access these json elements in such a way that I can perform simple transformation such as, selecting only records containing "Service" as a type?
Upvotes: 2
Views: 3856
Reputation: 3422
As you are using SimpleStringSchema
the resulting stream of events is of type String
. Therefore you would need to parse the string first and then you could apply filters etc.
You may want to have a look though at JsonNodeDeserializationSchema, which will produce ObjectNode
.
Upvotes: 1