Reputation: 1546
In my pipeline I use WriteToBigQuery something like this:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
This returns a Dict as described in the documentation as follows:
The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.
How do I print this dict and turn it into a pcollection or how do I just print the FAILED_ROWS?
If I do: | "print" >> beam.Map(print)
Then I get: AttributeError: 'dict' object has no attribute 'pipeline'
I must have read a hundred pipelines but never have I seen anything after the WriteToBigQuery.
[edit] When I finish the pipeline and store the results in a variable I have the following:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
But I do not know how to use this result in the pipeline like this:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
Upvotes: 5
Views: 5990
Reputation: 7058
Dead letters to handle invalid inputs are a common Beam/Dataflow usage and work with both Java and Python SDKs but there are not many examples for the latter.
Imagine that we have some dummy input data with 10 good lines and a bad row that does not conform to the table schema:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
Then, what I do is name the write result (events
in this case):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
and then access the FAILED_ROWS
side output:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
This works well with the DirectRunner
and writes the good lines to BigQuery:
and the bad one to a local file:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
If you run it with the DataflowRunner
you'll need some additional flags. If you face the TypeError: 'PDone' object has no attribute '__getitem__'
error you'll need to add --experiments=use_beam_bq_sink
to use the new BigQuery sink.
If you get a KeyError: 'FailedRows'
it's because the new sink will default to load BigQuery jobs for batch pipelines:
STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.
You can override the behavior by specifying method='STREAMING_INSERTS'
in WriteToBigQuery
:
Full code for both DirectRunner
and DataflowRunner
here.
Upvotes: 18