Reputation: 1141
Looks like any code outside the pipe line won't be run on Dataflow. In the following example I get a NullPointerException
for TableSchema
in the TableRowConverterFn.processElement
method. What is the right way to do this with Apache Beam/Dataflow?
private static TableSchema TableSchema;
public static void main(String[] args) {
try {
TableSchema = TableSchemaReader.read(TableSchemaResource);
} catch (IOException e) {
log.error("Table schema can not be read from {}. Process aborted.", TableSchemaResource);
return;
}
DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
//.withValidation()
.as(DataflowDfpOptions.class);
Pipeline pipeline = Pipeline.create(options);
Stopwatch sw = Stopwatch.createStarted();
log.info("DFP data transfer from GS to BQ has started.");
pipeline.apply("ReadFromStorage", TextIO.read()
.from("gs://my-test/stream/*.gz")
.withCompression(Compression.GZIP))
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getTableId())
.withMethod(STREAMING_INSERTS)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead
pipeline.run().waitUntilFinish();
log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}
/**
* Creates a TableRow from a CSV line
*/
private static class TableRowConverterFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
//Ignore the header line
//Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
if (split[0].equals("Time")) {
log.info("Skipped header");
return;
}
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
//This throws NEP!!!
TableFieldSchema col = TableSchema.getFields().get(i);
//String is the most common type, putting it in the first if clause for a little bit optimization.
if (col.getType().equals("STRING")) {
row.set(col.getName(), split[i]);
} else if (col.getType().equals("INTEGER")) {
row.set(col.getName(), Long.valueOf(split[i]));
} else if (col.getType().equals("BOOLEAN")) {
row.set(col.getName(), Boolean.valueOf(split[i]));
} else if (col.getType().equals("FLOAT")) {
row.set(col.getName(), Float.valueOf(split[i]));
} else {
//Simply try to write it as a String if
//todo: Consider other BQ data types.
row.set(col.getName(), split[i]);
}
}
c.output(row);
}
}
Upvotes: 0
Views: 746
Reputation: 365
Althought this code might work locally in a DirectRunner, it indeed can't work in a DataflowRunner. Here's why:
The DoFns created outside of your main
function do not have access to your classes's (even static) variables with the DataflowRunner. This is, I believe (not 100% sure though), due to how Dataflow stages and serializes DoFns when running in the cloud.
Here's how you can overcome this issue:
private static class TableRowConverterFn extends DoFn<String, TableRow> {
private static TableSchema tableSchema;
public TableRowConverterFn(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// stuff
}
}
Then in your main function call
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn(tableSchema)));
Upvotes: 2