Reputation: 93
I'm running in to the same problem as: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows and I'm pretty sure it has to do with the schema:
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_... failed. Error Result: <ErrorProto location: 'gs://dataflow/tmp/bq_load/some_file'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://some_file'
reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs-ptransform-27']
The problem here is that I have a large schema (running dataflow jobs) and just checking it for small errors is tedious. Is there any way to see better error messages/get more logs that actually pinpoints which part of the schema is wrong?
Upvotes: 1
Views: 290
Reputation: 6572
I have often the same issue with Beam
Python
and BigQueryIO
, the error is not clear in this case and the bad fields in schema are not indicated.
To solve this kind of issue, I usually use a schema or object validation in the input element and use a dead letter queue for element in errors.
Then I sink the error to a BigQuery
table for analysis.
I created a library to simplify error handling with Beam
called Asgarde :
# Beam pipeline with Asgarde library.
input_teams: PCollection[str] = p | 'Read' >> beam.Create(team_names)
result = (CollectionComposer.of(input_teams)
.map('Map with country', lambda tname: TeamInfo(name=tname, country=team_countries[tname], city=''))
.map('Map with city', lambda tinfo: TeamInfo(name=tinfo.name, country=tinfo.country, city=team_cities[tinfo.name]))
.filter('Filter french team', lambda tinfo: tinfo.country == 'France'))
result_outputs: PCollection[TeamInfo] = result.outputs
result_failures: PCollection[Failure] = result.failures
A wrapper CollectionComposer
is created from a PCollection
, this structure returns :
PCollection
of good outputsPCollection
of failuresThe failure is represented by a Failure
object :
@dataclass
class Failure:
pipeline_step: str
input_element: str
exception: Exception
You can sink the Failure
PCollection
to a BigQuery
table for analysis.
You can also check this article Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time
I also share with you :
Upvotes: 2