teddy
teddy

Reputation: 423

Parse string in json format from Kafka using Flink

What I want to do is reading a string in json format e.g.

{"a":1, "b":2}

using flink and then extract a specific value by its key, say 1.

Refer to here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

What I have done is:

val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment

val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties
)

val messageStream = env.addSource(kafkaConsumer)

But I am not quite sure how to move forward then. In the link above, it says I can use objectNode.get(“field”).as(Int/String/…)() to extract a specific value by key, but I wonder how can I do that?

Or there can be a completely different way to achieve what I want?

Thanks!

Upvotes: 1

Views: 2861

Answers (1)

BrightFlow
BrightFlow

Reputation: 1294

Apply data transformation on the data from Kafka like this:

messageStream.map(new MapFunction<ObjectNode, Object>() {
    @Override
    public Object map(ObjectNode value) throws Exception {
        value.get("field").as(...)
    }
})

Upvotes: 2

Related Questions