Shekar Tippur
Shekar Tippur

Reputation: 165

Apache Flink - Serialize json and perform join operation

I am trying to use Jackson library to read a String from Kafka topic and perform a join from another stream.

Here is a sample code with two streams of data. I want to perform join operation on these to message stream.

Say for example, the incoming streams are:

messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}

Join criteria is messageStream1."A" = messageStream2."B". How do I implement this in Flink?

DataStream 1:

DataStream<String> messageStream1 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream1.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

DataStream 2:

DataStream<String> messageStream2 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream2.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

Upvotes: 1

Views: 3079

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

You need to extract the key field into an extra attribute, such that Flink can access it (an alternative would be to provide an custom key selector: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys).

Thus, your return type of the map(...) might be Tuple2<String,JsonNode> (if String is the correct type of your join attribute).

Then, you can specify your join as described in the documentation (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html):

messageStream1.join(messageStream2)
    .where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
    .apply(new JoinFunction() {...});

In order to perform a join using DataStream API, you also need to specify a join window. Only tuples that belong to the same window can be joined.

Upvotes: 2

Related Questions