Jh123
Jh123

Reputation: 93

How to get better log from big query schema error

I'm running in to the same problem as: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows and I'm pretty sure it has to do with the schema:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_... failed. Error Result: <ErrorProto location: 'gs://dataflow/tmp/bq_load/some_file'
     message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://some_file'
     reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs-ptransform-27']

The problem here is that I have a large schema (running dataflow jobs) and just checking it for small errors is tedious. Is there any way to see better error messages/get more logs that actually pinpoints which part of the schema is wrong?

Upvotes: 1

Views: 290

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I have often the same issue with Beam Python and BigQueryIO, the error is not clear in this case and the bad fields in schema are not indicated.

To solve this kind of issue, I usually use a schema or object validation in the input element and use a dead letter queue for element in errors.

Then I sink the error to a BigQuery table for analysis.

I created a library to simplify error handling with Beam called Asgarde :

# Beam pipeline with Asgarde library.
input_teams: PCollection[str] = p | 'Read' >> beam.Create(team_names)

result = (CollectionComposer.of(input_teams)
            .map('Map with country', lambda tname: TeamInfo(name=tname, country=team_countries[tname], city=''))
            .map('Map with city', lambda tinfo: TeamInfo(name=tinfo.name, country=tinfo.country, city=team_cities[tinfo.name]))
            .filter('Filter french team', lambda tinfo: tinfo.country == 'France'))

result_outputs: PCollection[TeamInfo] = result.outputs
result_failures: PCollection[Failure] = result.failures

A wrapper CollectionComposer is created from a PCollection, this structure returns :

  • PCollection of good outputs
  • PCollection of failures

The failure is represented by a Failure object :

@dataclass
class Failure:
    pipeline_step: str
    input_element: str
    exception: Exception

You can sink the Failure PCollection to a BigQuery table for analysis.

You can also check this article Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time

I also share with you :

Upvotes: 2

Related Questions