Reputation: 225
I am trying to run a simple beam script on GCP Dataflow in order to apply a scikit-learn model to some data. The data needs to be processed before and after the model is applied. That is what the textExtraction and translationDictionary are. I keep getting the error AttributeError: module 'google.cloud' has no attribute 'storage'
(full stack trace below). As you can see, I tried running in a new virtual environment with new installations. Any idea how to fix?
I also have my script given below.
predict_DF_class.py
import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import json
import joblib
query = """
SELECT index, product_title
FROM `project.dataset.table`
LIMIT 1000
"""
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
self._textExtraction = None
self._translationDictionary = None
self._storage = storage
self._pkl = pkl
self._pd = pd
self._joblib = joblib
def process(self, element):
if self._model is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('model.joblib')
self._model = self._joblib.load(blob.download_as_string())
if self._textExtraction is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('tfidfit')
self._textExtraction = self._pkl.loads(blob.download_as_string())
if self._translationDictionary is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('id_to_category')
self._translationDictionary = self._pkl.loads(blob.download_as_string())
new_x = self._pd.DataFrame.from_dict(element,
orient="index").transpose().fillna(0)
course = self._translationDictionary[self._model.predict(self._text_extraction.transform(new_x.iloc[:, 1:]).toarray()).tolist()[0]]
return [{'guid': element['index'], 'product_title':element['product_title'], 'course_id': course}]
schema = parse_table_schema_from_json(json.dumps({'fields':
[ { 'name': 'index', 'type': 'INTEGER'},
{ 'name': 'product_title', 'type': 'STRING'},
{ 'name': 'course_id', 'type': 'STRING'} ]}))
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.gcp.bigquery.WriteToBigQuery(
'output_table', 'dataset', 'project', schema = schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
# run the pipeline
result = p.run()
result.wait_until_finish()
error:
(env) (base) HOBNJML-C40PLVD:predictions uswygst$ python predict_DF_class.py --runner DataflowRunner --project $PROJECT -requirements_file "requirements.txt" --temp_location gs://marketing-analytics-data/template/
predict_DF_class.py:53: BeamDeprecationWarning: parse_table_schema_from_json is deprecated since 2.11.0. Use bigquery_tools.parse_table_schema_from_json instead.
{ 'name': 'courseid', 'type': 'STRING'} ]}))
/Users/uswygst/Documents/Company/Projects/AAA/predictions/env/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1479: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
experiments = p.options.view_as(DebugOptions).experiments or []
/Users/uswygst/Documents/Company/Projects/AAA/predictions/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/ptransform_overrides.py:315: BeamDeprecationWarning: BigQuerySink is deprecated since 2.11.0. Use WriteToBigQuery instead.
kms_key=self.kms_key))
WARNING: You are using pip version 19.2.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
WARNING: You are using pip version 19.2.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-requirements_file', 'requirements.txt']
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-requirements_file', 'requirements.txt']
Traceback (most recent call last):
File "predict_DF_class.py", line 71, in <module>
result.wait_until_finish()
File "/Users/uswygst/Documents/Company/Projects/AAA/predictions/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1629, in wait_until_finish
self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: module 'google.cloud' has no attribute 'storage'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 616, in apache_beam.runners.worker.operations.DoOperation.setup
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: module 'google.cloud' has no attribute 'storage'
Upvotes: 0
Views: 2239
Reputation: 1977
Dataflow isn't reading requirements.txt:
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-requirements_file', 'requirements.txt']
Note that you need to use double hyphens for the requirements_file
flag.
I.E. -requirements_file "requirements.txt"
-> --requirements_file requirements.txt
Upvotes: 5