Reputation: 423
My goal is to use kafka to read in a string in json format, do a filter to the string, select part of the message and sink the message out (still in json string format).
For testing purpose, my input string message looks like:
{"a":1,"b":2,"c":"3"}
And my code of implementation is:
def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"
val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
inputTopic,
new JSONDeserializationSchema(),
inputProperties)
val messageStream : DataStream[ObjectNode]= env
.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
.asText.equals("1") && node.get("b").asText.equals("2"))
// Need help in this part, how to extract for instance a,c and
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
node => {
node.get("a")
}
)
testStream.addSink(new FlinkKafkaProducer010[JsonNode](
outputTopic,
new SerializationSchema[JsonNode] {
override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
}, outputProperties
))
env.execute("Kafka 0.10 Example")
}
As shown in the comment of this code, I am not sure how to properly select part of the message. I use map, but I don't know how to concatenate the whole message. For instance, what I did in the code can only give me a result as "1", but what I want is {"a":1, "c":"3"}
Or maybe there is a completely different way to solve this problem. The thing is in spark streaming there is a "select" API, however I cannot find it in Flink.
And thanks a lot for flink community's help! This is the last feature I would like to achieve in this small project.
Upvotes: 0
Views: 1891
Reputation: 1294
Flink Streaming job processes each input one time and output it to the next task or save them onto external storage.
One way is save all the outputs into external storage, like HDFS. After streaming job is done, using a batch job to combine them into a JSON.
Another way is to use state and RichMapFunction to get the JSON containing all the key-values.
stream.map(new MapFunction<String, Tuple2<String, String>>() {
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<String, String>("mock", value);
}
}).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() {
@Override
public String map(Tuple2<String, String> value) throws Exception {
ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class));
String newVal = old.value();
if (newVal != null) makeJSON(newVal, value.f1);
else newVal = value.f1;
old.update(newVal);
return newVal;
}
}).print();
And use this map function: filteredStream.map(function);
Note that when using state, you will see output like this: {"a": 1}, {"a": 1, "c": 3}. The last output should be what you want.
Upvotes: 1