Reputation: 644
I am trying to build a TableRow
object to eventually be written to a BigQuery table, but I get a NullPointerException
if I include a null
value in the row. This is the full stacktrace:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at dataflowsandbox.StarterPipeline.runTest(StarterPipeline.java:224)
at dataflowsandbox.StarterPipeline.main(StarterPipeline.java:83)
Caused by: java.lang.NullPointerException
at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
at java.util.AbstractMap.hashCode(AbstractMap.java:530)
at java.util.Arrays.hashCode(Arrays.java:4146)
at java.util.Objects.hash(Objects.java:128)
at org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:245)
at java.util.HashMap.hash(HashMap.java:339)
at java.util.HashMap.get(HashMap.java:557)
at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at dataflowsandbox.StarterPipeline$6.procesElement(StarterPipeline.java:202)
Process finished with exit code 1
This is the code that triggers the NullPointerException
:
Pipeline p = Pipeline.create( options );
p.apply( "kicker", Create.of( "Kick!" ) )
.apply( "Read values", ParDo.of( new DoFn<String, TableRow>() {
@ProcessElement
public void procesElement( ProcessContext c ) {
TableRow row = new TableRow();
row.set( "ev_id", "2323423423" );
row.set( "customer_id", "111111" );
row.set( "org_id", null ); // Without this line, no NPE
c.output( row );
} }) )
.apply( BigQueryIO.writeTableRows()
.to( DATA_TABLE_OUT )
.withCreateDisposition( CREATE_NEVER )
.withWriteDisposition( WRITE_APPEND ) );
PipelineResult result = p.run();
My actual code is a little more complicated, but I should be able to catch the null
value and just not set it in the row, but maybe I don't understand something about TableRows
.
Upvotes: 1
Views: 2176
Reputation: 41
If you are using DirectRunner use the parameter --enforceImmutability=false. It worked for me. This issue has been handled by Dataflow Runner but when DirectRunner is used we encounter the NPE if null is passed to tableRow.set(). If we turn off the DirectRunner's ImmutabilityEnforcement check by setting the --enforceImmutability=false pipeline option, the error is no longer seen.
Ref: https://issues.apache.org/jira/browse/BEAM-1714
Upvotes: 4
Reputation: 7058
You can, for example, provide the table schema and just omit setting the value of the field.
The table schema, where org_id
is NULLABLE
:
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ev_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("customer_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("org_id").setType("STRING").setMode("NULLABLE"));
TableSchema schema = new TableSchema().setFields(fields);
Simply do not set any value for that field (comment out that line):
row.set( "ev_id", "2323423423" );
row.set( "customer_id", "111111" );
// row.set( "org_id", None ); // Without this line, no NPE
c.output( row );
Pass the table schema in the write step:
.apply( BigQueryIO.writeTableRows()
.to( DATA_TABLE_OUT )
.withSchema(schema)
.withCreateDisposition( CREATE_NEVER )
.withWriteDisposition( WRITE_APPEND ) );
A NULL
value will be written to BigQuery:
Upvotes: 2
Reputation: 12
Put a temporary value instead of the null or an empty string. As far as I can tell tablerows don't accept null values.
Upvotes: 0