Varun Sharma
Varun Sharma

Reputation: 39

Json Validation in Apache beam using Google Cloud Dataflow

I am trying to write a Filter transformation using Apache beam Java SDK and i need to filter out invalid Json Messages.

If i create a new Gson Object for every element validation, the implementation works fine. However I want to avoid the creation of Gson Objects for every elements(throughput is 1K/Second) and validate the json.

I am creating a constant Gson Object at the start and initializing it in the static block. This approach is not working. Not sure why the same object cannot be used to parse multiple elements as we are not changing the state of the object during the processing?

// Gson object declared as constant
private static final Gson gsonObj=new Gson();

// Initialized GSon object during class loading before main method invocation
static {
    gsonObj = new Gson();
}

....

/*
enum to validate json messages.
 */
enum InputValidation implements SerializableFunction<String, Boolean> {
    VALID {
        @Override
        public Boolean apply(String input) {
            try {
                gsonObj.fromJson(input, Object.class);
                return true;
            } catch(com.google.gson.JsonSyntaxException ex) {
                return false;
            }
        }
    }
}

Upvotes: 0

Views: 1083

Answers (1)

miles212
miles212

Reputation: 383

Use TupleTag to filter-out the record, instead of 'enum InputValidation implements'. Use the below code to filter out the unparseable json row.

Pipeline p = Pipeline.create(options);

TupleTag<String> successParse = new TupleTag<String>();
TupleTag<String> failParse = new TupleTag<String>();

private static final Gson gsonObj=new Gson();

PCollectionTuple = input.apply(ParDo.of(new DoFn<String, String>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            gsonObj.fromJson(c.element(), Object.class);
            c.output(successParse,c.element());
        } catch {
            c.output(failParse,c.element());
        }
    }
}).withOutputTags(successParse, TupleTagList.of(failParse)));

Above piece of code worked in my case and the optimum solution to filter out the records.

Here is the official documentation example.

Upvotes: 1

Related Questions