Reputation: 107
I'm getting a null pointer exception in my Dataflow pipeline, but all of the values at the location of the exception are defined correctly. In this code I am reading from a database, doing some conversions on the result set, and then trying o create a table in an existing dataset based on the table rows in that result set. I've confirmed the values passed to the BigQueryIO.writeTableRows() call are all valid, but still I get an exception on the line where I try to carry out the write to Big Query. I starred the location of the null pointer exception in the code below.
// Gather First query results
WriteResult results = pipeline
.apply("Connect", JdbcIO.<TableRow>read()
.withDataSourceConfiguration(buildDataSourceConfig(options, URL))
.withQuery(query)
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
// Convert ResultSet to PCollection
public TableRow mapRow(ResultSet rs) throws Exception {
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount();
TableRow tr = new TableRow();
for (int i = 1; i <= columnCount; i++ ) {
String name = md.getColumnName(i);
tr.set(name, rs.getString(name));
}
return tr;
}
}))
.setCoder(TableRowJsonCoder.of())
.apply("Write to BQ",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(dataset)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
023-01-10T20:33:22.4214526Z WARNING: Unable to infer a schema for type com.google.api.services.bigquery.model.TableRow. Attempting to infer a coder without a schema.
2023-01-10T20:33:22.4216783Z Exception in thread "main" java.lang.NullPointerException
2023-01-10T20:33:22.4218945Z at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validateNoJsonTypeInSchema(BigQueryIO.java:3035)
2023-01-10T20:33:22.4221029Z at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.continueExpandTyped(BigQueryIO.java:2949)
2023-01-10T20:33:22.4222727Z at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2880)
2023-01-10T20:33:22.4226464Z at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2776)
2023-01-10T20:33:22.4228072Z at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1786)
2023-01-10T20:33:22.4234778Z at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
2023-01-10T20:33:22.4237961Z at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
2023-01-10T20:33:22.4240010Z at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:376)
2023-01-10T20:33:22.4242466Z at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.buildPipeline(SqaTransfer.java:133)
2023-01-10T20:33:22.4244722Z at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.main(SqaTransfer.java:99)
2023-01-10T20:33:22.4246444Z . exit status 1
Upvotes: 0
Views: 337
Reputation: 6572
The current error is due to a bad schema passed in the withSchema(schema)
method in BigQueryIO
.
The schema can be created with TableSchema
object :
TableSchema schema =
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema()
.setName("string_field")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("int64_field")
.setType("INT64")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("float64_field")
.setType("FLOAT64"), // default mode is "NULLABLE"
new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
new TableFieldSchema().setName("bool_field").setType("BOOL"),
new TableFieldSchema().setName("bytes_field").setType("BYTES"),
new TableFieldSchema().setName("date_field").setType("DATE"),
new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
new TableFieldSchema().setName("time_field").setType("TIME"),
new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
new TableFieldSchema()
.setName("array_field")
.setType("INT64")
.setMode("REPEATED")
.setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
new TableFieldSchema()
.setName("struct_field")
.setType("STRUCT")
.setDescription(
"A STRUCT accepts a custom data class, the fields must match the custom class fields.")
.setFields(
Arrays.asList(
new TableFieldSchema().setName("string_value").setType("STRING"),
new TableFieldSchema().setName("int64_value").setType("INT64")))));
return schema;
// In the IO.
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
...
You can also use a Json
schema :
String tableSchemaJson =
""
+ "{"
+ " \"fields\": ["
+ " {"
+ " \"name\": \"source\","
+ " \"type\": \"STRING\","
+ " \"mode\": \"NULLABLE\""
+ " },"
+ " {"
+ " \"name\": \"quote\","
+ " \"type\": \"STRING\","
+ " \"mode\": \"REQUIRED\""
+ " }"
+ " ]"
+ "}";
// In the IO.
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withJsonSchema(tableSchemaJson)
...
You can check the documentation to have more details.
Upvotes: 2