Reputation: 602
I'm attempting to migrate IngestionTime (_PARTITIONTIME) to TIMESTAMP partitioned tables in BQ. In doing so, I also need to add several required columns. However, when I flip the switch and redirect my dataflow to the new TIMESTAMP partitioned table, it breaks. Things to note:
I've been investigating the issue for a couple of days and tried to break down the transition into the smallest steps possible. It appears that the step responsible for the error is introducing REQUIRED variables (it works fine when the same variables are NULLABLE). To avoid any possible parsing errors, I've set default values for all of the REQUIRED variables.
At the moment, I get the following combination of errors and I'm not sure how to address any of them:
The first error, repeats infrequently but usually in groups:
Profiling Agent not found. Profiles will not be available from this worker
Occurs a lot and in large groups:
Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner
Appears to be one very large group of these:
Aborting Operations. java.lang.RuntimeException: Unable to read value from state
Towards the end, this error appears every 5 minutes only surrounded by mild parsing errors described below.
Processing stuck in step BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParMultiDo(WriteTables) for at least 20m00s without outputting or completing in state finish
Due to the sheer volume of data my project parses, there are several parsing errors such as Unexpected character. They're rare but shouldn't break data insertion. If they do, I have a bigger problem as the data I collect changes frequently and I can adjust the parser only after I see the error, and therefore, see the new data format. Additionally, this doesn't cause the ingestiontime table to break (or my other timestamp partition tables to break). That being said, here's an example of a parsing error:
Error: Unexpected character (',' (code 44)): was expecting double-quote to start field name
EDIT: Some relevant sample code:
public PipelineResult streamData() {
try {
GenericSection generic = new GenericSection(options.getBQProject(), options.getBQDataset(), options.getBQTable());
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Windowing", generic.getWindowDuration(options.getWindowDuration()))
.apply(generic.getPubsubToString())
.apply(ParDo.of(new CrowdStrikeFunctions.RowBuilder()))
.apply(new BigQueryBuilder().setBQDest(generic.getBQDest())
.setStreaming(options.getStreamingUpload())
.setTriggeringFrequency(options.getTriggeringFrequency())
.build());
return pipeline.run();
}
catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
Writing to BQ. I did try to set the partitoning field here directly, but it didn't seem to affect anything:
BigQueryIO.writeTableRows()
.to(BQDest)
.withMethod(Method.FILE_LOADS)
.withNumFileShards(1000)
.withTriggeringFrequency(this.triggeringFrequency)
.withTimePartitioning(new TimePartitioning().setType("DAY"))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
}
Upvotes: 0
Views: 709
Reputation: 602
After a lot of digging, I found the error. I had parsing logic (a try/catch) that returned nothing (essentially a null row) in the event there was a parsing error. This would break BigQuery as my schema had several REQUIRED rows.
Since my job ran in batches, even one null row would cause the entire batch job to fail and not insert anything. This also explains why streaming inserted just fine. I'm surprised that BigQuery didn't throw an error claiming that I was attempting to insert a null into a required field.
In reaching this conclusion, I also realized that setting the partition field in my code was also necessary as opposed to just in the schema. It could be done using
.setField(partitionField)
Upvotes: 1