Mayan Salama
Mayan Salama

Reputation: 51

Cloud Dataflow Writing to BigQuery Python Errors

I'm writing a simple Beam job to copy data from a GCS bucket over to BigQuery. The code looks like the following:

from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam as beam

pipeline_options = GoogleCloudOptions(flags=sys.argv[1:])
pipeline_options.project = PROJECT_ID
pipeline_options.region = 'us-west1'
pipeline_options.job_name = JOB_NAME
pipeline_options.staging_location = BUCKET + '/binaries'
pipeline_options.temp_location = BUCKET + '/temp'

schema = 'id:INTEGER,region:STRING,population:INTEGER,sex:STRING,age:INTEGER,education:STRING,income:FLOAT,statusquo:FLOAT,vote:STRING'
p = (beam.Pipeline(options = pipeline_options)
     | 'ReadFromGCS' >> beam.io.textio.ReadFromText('Chile.csv')
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('project:tmp.dummy', schema = schema))

Where we're writing into the table tmp.dummy in the project project. This is resulting in the following stacktrace:

Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 151, in _run_module_as_main
    mod_name, loader, code, fname = _get_module_details(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 101, in _get_module_details
    loader = get_loader(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 464, in get_loader
    return find_loader(fullname)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 474, in find_loader
    for importer in iter_importers(fullname):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 430, in iter_importers
    __import__(pkg)
  File "WriteToBigQuery.py", line 49, in <module>
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(str(PROJECT_ID + ':' + pipeline_options.write_file), schema = schema))
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1337, in __init__
    self.table_reference = _parse_table_reference(table, dataset, project)
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 309, in _parse_table_reference
    if isinstance(table, bigquery.TableReference):
AttributeError: 'module' object has no attribute 'TableReference'

It looks like some import is going wrong somewhere; is it possible this has resulted from using the GoogleCloudOptions pipeline option?

Upvotes: 3

Views: 7342

Answers (2)

mohammed_ayaz
mohammed_ayaz

Reputation: 679

I have got the same error. I realized that I have installed the wrong apache beam package. You need to add [gcp] to the package-name while installing apache beam.

sudo pip install apache_beam[gcp]

Some more optional installation to fix the installation errors and you are good to go.

sudo pip install oauth2client==3.0.0
sudo pip install httplib2==0.9.2

Upvotes: 6

Guillem Xercavins
Guillem Xercavins

Reputation: 7058

I made some tests and was unable to reproduce your issue, does the dataset already exist?. The following snippet worked for me (I use an answer for better formatting):

import apache_beam as beam
import sys

PROJECT='PROJECT_ID'
BUCKET='BUCKET_NAME'
schema = 'id:INTEGER,region:STRING'

class Split(beam.DoFn):

    def process(self, element):
        id, region = element.split(",")

        return [{
            'id': int(id),
            'region': region,
        }]

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/staging/dummy.csv'.format(BUCKET))
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:test.dummy'.format(PROJECT), schema=schema)
   )

   p.run()

if __name__ == '__main__':
   run()

where dummy.csv contains:

$ cat dummy.csv 
1,us-central1 
2,europe-west1 

and output in BigQuery is:

enter image description here

Some relevant dependencies used:

apache-beam==2.4.0
google-cloud-bigquery==0.25.0
google-cloud-dataflow==2.4.0

Upvotes: 4

Related Questions