Vidya K
Vidya K

Reputation: 103

Unable to use CsvFileSource from beam_utils.sources in apache beam program on Cloud Dataflow

Unable to import CsvFileSource but able to install beam_utils.

I need this import to run the Cloud data flow program.

Code has -

from beam_utils.sources import CsvFileSource

Error message :

>>> from beam_utils.sources import CsvFileSource
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vk/.local/lib/python2.7/site-packages/beam_utils/sources.py", line 14, in <module>
    class JsonLinesFileSource(beam.io.filebasedsource.FileBasedSource):
  File "/home/vk/.local/lib/python2.7/site-packages/beam_utils/sources.py", line 17, in JsonLinesFileSource
    compression_type=fileio.CompressionTypes.AUTO,
AttributeError: 'module' object has no attribute 'CompressionTypes'
>>>

I even tried import using python3. any idea, how I can bypass.

Upvotes: 1

Views: 356

Answers (2)

Apache beam has recently updated and changed a few of their methods and attributes.

In particular, the fileio class is now filesystem. If you want to quickfix this, you can edit beam_utils source (python_home\lib\site-packages\beam_utils\sources.py) and replace 'fileio' for 'filesystem'. It should work ;)

If you take a look at the github repo (https://github.com/pabloem/beam_utils/blob/master/beam_utils/sources.py), the changes are already there. I guess it's a matter of time until they're added to pip!

Upvotes: 1

TheLemon
TheLemon

Reputation: 33

I'm also getting the same error. I'm attemping to load csv into a dictionary and then write it to local (eventually to BQ).

   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/'.format(BUCKET),
      '--temp_location=gs://{0}/'.format(BUCKET),
      #'--runner=DataflowRunner'
      '--runner=DirectRunner'
   ]

   p= beam.Pipeline(argv=argv)
   rows = p | 'ReadCSV' >> beam.io.Read(CsvFileSource('gs://blahblah/file.csv')) | 'Write to file' >> beam.io.WriteToText('s
trings', file_name_suffix='.txt')```


-[snip]-

 Apache Beam will soon support Python 3 only.
  'You are using Apache Beam with Python 2. '
Traceback (most recent call last):
  File "avg-ecom-rating.py", line 5, in <module>
    from beam_utils.sources import CsvFileSource
  File "/home/dlemon/env/local/lib/python2.7/site-packages/beam_utils/sources.py", line 14, in <module>
    class JsonLinesFileSource(beam.io.filebasedsource.FileBasedSource):
  File "/home/dlemon/env/local/lib/python2.7/site-packages/beam_utils/sources.py", line 17, in JsonLinesFileSource
    compression_type=fileio.CompressionTypes.AUTO,
AttributeError: 'module' object has no attribute 'CompressionTypes'


Upvotes: 0

Related Questions