Reputation: 14485
I am having an issue trying to run a GCP Cloud-Dataflow pipeline.
The pipeline works when running locally using the "DirectRunner" but fails when trying to run in dataflow with the "DataflowRunner".
It fails immediately when calling run()
on the pipeline with the error message given above(as opposed to first deploying to GCP then failing when actually running the pipeline).
The exception is thrown inside a call to beam.io.WriteToBigQuery
:
(bq_rows
| 'map_to_row' >> beam.Map(to_pred_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
'my_dataset_name.my_table_name',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
If I replace the last node in the pipeline with something that just writes to file:
(bq_rows
| 'map_to_row' >> beam.Map(to_pred_row)
| 'debug_write_to_csv_2' >> beam.io.WriteToText(additional_opts.out_path, ".txt"))
Then everything works as expected and I get a text file with all the records I would expect.
If I run everything as is with the WriteToBigQuery()
function but change back to the DirectRunner
(and change nothing else) then everything works and the new rows are written to the BQ table.
As far as I can tell there is nothing remarkable about the records flowing in to
the WriteToBigQuery
node. I have output these to a text file both running locally and in the cloud in an effort to isolate a cause for this error but both outputs look identical (and matching the schema of the destination table). In any event it doesn't seem like things are getting far enough for an unexpected value or parameter when running the flow - as mentioned about this error happens whenever I call run()
on the pipeline
Where am I going wrong?
UPDATE:
Here is a Minimal Example of the same behaviour. Having created a table named temp_e.words
with a single (STRING, REQUIRED) column named word
, I can reproduce the behaviour with this code:
import apache_beam as beam
from google.cloud import storage as gcs
import shutil
from google.cloud import bigquery as bq
import datetime
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
StandardOptions)
def to_row(word):
return {
'word': word
}
def run_pipeline(local_mode):
PROJECT = 'searchlab-data-insights'
REGION = 'us-central1'
GCS_BUCKET_PATH = 'gs://temp-staging-e'
timestamp = datetime.datetime.now().strftime('%y%m%d-%H%M%S')
options = beam.pipeline.PipelineOptions([
'--project', PROJECT
])
if local_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT
google_cloud_options.job_name = 'test-{}'.format(timestamp)
google_cloud_options.staging_location = os.path.join(GCS_BUCKET_PATH, 'staging')
google_cloud_options.temp_location = os.path.join(GCS_BUCKET_PATH, 'tmp')
options.view_as(StandardOptions).runner = RUNNER
p = beam.Pipeline(RUNNER, options=options)
bq_rows = p | beam.Create(['words', 'to', 'store'])
(bq_rows
| 'map_to_row' >> beam.Map(to_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
'temp_e.words',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
job = p.run()
if local_mode:
job.wait_until_finish()
print "Done!"
Now running run_pipeline(local_mode=True)
produces the correct result and the rows are appended whereas running run_pipeline(local_mode=False)
immediately triggers the error.
The full error generated is here: https://pastebin.com/xx8wwtXV
Upvotes: 0
Views: 1490
Reputation: 14485
This issue appears to only occur when there is no schema provided to the call to beam.io.WriteToBigQuery
. It seems that the DirectRunner can work using the existing table schema but the DataflowRunner cannot.
in the absence of a better answer, we can work around it by explicitly providing the schema.
So, for example, in the minimal example above we could use this:
(bq_rows | 'map_to__row' >> beam.Map(to_row) | 'write_to_table' >> beam.io.WriteToBigQuery( 'temp_e.words', schema={"fields":[{"type":"STRING","name":"word","mode":"REQUIRED"}]} write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) )
Upvotes: 1