YRK
YRK

Reputation: 163

Flink Process Function is not returning the data to Sideoutputstream

I am trying to validate JSONObject with set of rules if the json matches with set of rules is it will return the matched rule and the JSONObject if not it will return a JSONObject to Sideoutput all this is processed in a ProcessFuntion, i am getting the main output but unable to capture the side output

SideOutput Stream is defined as below

public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
            "unmatched-side-output") {};

ProcessFunction is defined as below

public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
    public void processElement(Tuple2<String, org.json.JSONObject> value,
            ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
            Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {

        if(this.value.matches((value.f1))) {
        out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
        }else {
            ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
        }
    }
}

I am printing main Datastream output as below

    DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
                            inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
                                @Override
                                public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
                                    return new Tuple2<>(value, input);
                                }
                            }).process(new RuleFilter()).print("MatchedJSON=>");

matchedJSON .print("matchedJSON=>");

I am printing Sideoutput as below

DataStream<org.json.JSONObject> unmatchedJSON =
                        ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
                            @Override
                            public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
                                return value.f1;
                            }
                        })).getSideOutput(unMatchedJSONSideOutput );

                unmatchedJSON.print("unmatchedJSON=>");

Main Stream is printing output but the sideoutput is not printing for invalid json please help in solving the issue

Upvotes: 2

Views: 970

Answers (1)

David Anderson
David Anderson

Reputation: 43524

The problem is here:

DataStream<org.json.JSONObject> unmatchedJSON =
    ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
    .getSideOutput(unMatchedJSONSideOutput);

You should be calling getSideOutput directly on matchedJSON, rather than on the result of applying a MapFunction to it. Only a ProcessFunction can have a side output, and it needs to come directly from the ProcessFunction. You tricked the compiler into accepting this by casting the output stream from the map, but the runtime can't do anything meaningful with this.

Upvotes: 1

Related Questions