Reputation: 165
I am trying to use Jackson library to read a String from Kafka topic and perform a join from another stream.
Here is a sample code with two streams of data. I want to perform join operation on these to message stream.
Say for example, the incoming streams are:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
Join criteria is messageStream1."A" = messageStream2."B"
. How do I implement this in Flink?
DataStream 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
DataStream 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
Upvotes: 1
Views: 3079
Reputation: 62350
You need to extract the key field into an extra attribute, such that Flink can access it (an alternative would be to provide an custom key selector: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys).
Thus, your return type of the map(...)
might be Tuple2<String,JsonNode>
(if String
is the correct type of your join attribute).
Then, you can specify your join as described in the documentation (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html):
messageStream1.join(messageStream2)
.where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
.apply(new JoinFunction() {...});
In order to perform a join using DataStream
API, you also need to specify a join window. Only tuples that belong to the same window can be joined.
Upvotes: 2