Reputation: 7
source code: source a String,and I try to map it into JSONObject,and i failed.
public class Test {
public static void main(String\[\] args) throws Exception {
//TODO 1.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.
DataStreamSource<String> inputDS = env.fromElements("{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}");
//TODO 3.
SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
out.collect(jsonObject);
}
});
jsonObjDS.print();
env.execute();
}
}
error:Assigned key must not be null!
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!
Process finished with exit code 1
I tried map,flatmap,process.They all failed when my collector collecting them. after debug,i found that String have been parsed into JSONObject,but just can not be collected.
Upvotes: 0
Views: 595
Reputation: 43612
To get it to compile, I had to change the code slightly, to
public void processElement(
String value,
ProcessFunction<String, JSONObject>.Context ctx,
Collector<JSONObject> out) throws Exception {
JSONObject jsonObject = new JSONObject(value);
out.collect(jsonObject);
}
}
and it works fine. It prints out
{"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}
I used
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
Note, however, that's generally not a good idea to be sending around JSONObjects -- they are expensive to serialize and deserialize. It would be better to deserialize the JSON to a POJO, along the lines of what is shown in this recipe.
Upvotes: 1