Reputation: 163
I am trying to validate JSONObject with set of rules if the json matches with set of rules is it will return the matched rule and the JSONObject if not it will return a JSONObject to Sideoutput all this is processed in a ProcessFuntion, i am getting the main output but unable to capture the side output
SideOutput Stream is defined as below
public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
"unmatched-side-output") {};
ProcessFunction is defined as below
public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
public void processElement(Tuple2<String, org.json.JSONObject> value,
ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {
if(this.value.matches((value.f1))) {
out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
}else {
ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
}
}
}
I am printing main Datastream output as below
DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
@Override
public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
return new Tuple2<>(value, input);
}
}).process(new RuleFilter()).print("MatchedJSON=>");
matchedJSON .print("matchedJSON=>");
I am printing Sideoutput as below
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
@Override
public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
return value.f1;
}
})).getSideOutput(unMatchedJSONSideOutput );
unmatchedJSON.print("unmatchedJSON=>");
Main Stream is printing output but the sideoutput is not printing for invalid json please help in solving the issue
Upvotes: 2
Views: 970
Reputation: 43524
The problem is here:
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
.getSideOutput(unMatchedJSONSideOutput);
You should be calling getSideOutput
directly on matchedJSON
, rather than on the result of applying a MapFunction
to it. Only a ProcessFunction
can have a side output, and it needs to come directly from the ProcessFunction
. You tricked the compiler into accepting this by casting the output stream from the map, but the runtime can't do anything meaningful with this.
Upvotes: 1