nomadic_squirrel
nomadic_squirrel

Reputation: 644

NullPointerException when outputting TableRow with null value

I am trying to build a TableRow object to eventually be written to a BigQuery table, but I get a NullPointerException if I include a null value in the row. This is the full stacktrace:

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at dataflowsandbox.StarterPipeline.runTest(StarterPipeline.java:224)
    at dataflowsandbox.StarterPipeline.main(StarterPipeline.java:83)
Caused by: java.lang.NullPointerException
    at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
    at java.util.AbstractMap.hashCode(AbstractMap.java:530)
    at java.util.Arrays.hashCode(Arrays.java:4146)
    at java.util.Objects.hash(Objects.java:128)
    at org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:245)
    at java.util.HashMap.hash(HashMap.java:339)
    at java.util.HashMap.get(HashMap.java:557)
    at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
    at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
    at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at dataflowsandbox.StarterPipeline$6.procesElement(StarterPipeline.java:202)

Process finished with exit code 1

This is the code that triggers the NullPointerException:

  Pipeline p = Pipeline.create( options );

  p.apply( "kicker", Create.of( "Kick!" ) )
  .apply( "Read values", ParDo.of( new DoFn<String, TableRow>() {
     @ProcessElement
     public void procesElement( ProcessContext c ) {

        TableRow row = new TableRow();

        row.set( "ev_id",       "2323423423" );
        row.set( "customer_id", "111111"     );
        row.set( "org_id",      null         ); // Without this line, no NPE
        c.output( row );  


     } }) )
     .apply( BigQueryIO.writeTableRows()
        .to( DATA_TABLE_OUT )
        .withCreateDisposition( CREATE_NEVER )
        .withWriteDisposition( WRITE_APPEND ) );

  PipelineResult result = p.run();

My actual code is a little more complicated, but I should be able to catch the null value and just not set it in the row, but maybe I don't understand something about TableRows.

Upvotes: 1

Views: 2176

Answers (3)

Sruthi Subramanyam
Sruthi Subramanyam

Reputation: 41

If you are using DirectRunner use the parameter --enforceImmutability=false. It worked for me. This issue has been handled by Dataflow Runner but when DirectRunner is used we encounter the NPE if null is passed to tableRow.set(). If we turn off the DirectRunner's ImmutabilityEnforcement check by setting the --enforceImmutability=false pipeline option, the error is no longer seen.

Ref: https://issues.apache.org/jira/browse/BEAM-1714

Upvotes: 4

Guillem Xercavins
Guillem Xercavins

Reputation: 7058

You can, for example, provide the table schema and just omit setting the value of the field.

The table schema, where org_id is NULLABLE:

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ev_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("customer_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("org_id").setType("STRING").setMode("NULLABLE"));
TableSchema schema = new TableSchema().setFields(fields);

Simply do not set any value for that field (comment out that line):

row.set( "ev_id",       "2323423423" );
row.set( "customer_id", "111111"     );
// row.set( "org_id",     None         ); // Without this line, no NPE
c.output( row );  

Pass the table schema in the write step:

.apply( BigQueryIO.writeTableRows()
   .to( DATA_TABLE_OUT )
   .withSchema(schema)
   .withCreateDisposition( CREATE_NEVER )
   .withWriteDisposition( WRITE_APPEND ) );

A NULL value will be written to BigQuery:

enter image description here

Upvotes: 2

olegarino
olegarino

Reputation: 12

Put a temporary value instead of the null or an empty string. As far as I can tell tablerows don't accept null values.

Upvotes: 0

Related Questions