Reputation: 423
What I want to do is reading a string in json format e.g.
{"a":1, "b":2}
using flink and then extract a specific value by its key, say 1.
Refer to here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
What I have done is:
val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONKeyValueDeserializationSchema(false),
params.getProperties
)
val messageStream = env.addSource(kafkaConsumer)
But I am not quite sure how to move forward then. In the link above, it says I can use objectNode.get(“field”).as(Int/String/…)()
to extract a specific value by key, but I wonder how can I do that?
Or there can be a completely different way to achieve what I want?
Thanks!
Upvotes: 1
Views: 2861
Reputation: 1294
Apply data transformation on the data from Kafka like this:
messageStream.map(new MapFunction<ObjectNode, Object>() {
@Override
public Object map(ObjectNode value) throws Exception {
value.get("field").as(...)
}
})
Upvotes: 2