Reputation: 223
I am trying to mimic this walkthrough here in order to build a pipeline to apply an sklearn model to data I have, but am running into an error. My command line input is below:
(venv) computer:predictions uswygst$ python predictions.py \
--runner DataflowRunner \
--project my_project \
--requirements_file "requirements.txt" \
--temp_location gs://my_bucket/template/ \
--worker_machine_type n1-standard-8 \
--num_workers 5
This command results in the following error:
predictions.py:57: BeamDeprecationWarning: parse_table_schema_from_json is deprecated since 2.11.0. Use bigquery_tools.parse_table_schema_from_json instead.
{ 'name': 'title', 'type': 'STRING'}]}))
/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1479: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
experiments = p.options.view_as(DebugOptions).experiments or []
/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/runners/dataflow/ptransform_overrides.py:315:
BeamDeprecationWarning: BigQuerySink is deprecated since 2.11.0. Use WriteToBigQuery instead.
kms_key=self.kms_key))
You are using pip version 9.0.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
You are using pip version 9.0.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
Traceback (most recent call last):
File "predictions.py", line 75, in <module>
result.wait_until_finish()
File "/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1629, in wait_until_finish
self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
A setup error was detected in beamapp-uswygst-091501355-09141837-d5ts-harness-qdhg. Please refer to the worker-startup log for detailed information.
When I look at the worker logs, I see this:
Failed to install packages: failed to install requirements: exit status 1
The code for my single python file that I am submitting is
import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import joblib
import json
query = """
SELECT index, product_title
FROM `my_project.my_dataset.my_table`
"""
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
self._textExtraction = None
self._translationDictionary = None
self._storage = storage
self._pkl = pkl
self._pd = pd
self._joblib = joblib
def process(self, element):
if self._textExtraction is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('tfidfit')
self._textExtraction = pkl.loads(blob.download_as_string(), encoding='latin-1')
if self._translationDictionary is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('id_to_category')
self._translationDictionary = self._pkl.loads(blob.download_as_string())
if self._model is None:
bucket = self._storage.Client().get_bucket(
'marketing-analytics-data')
blob = bucket.get_blob('model.joblib')
model_local='local_model'
blob.download_to_filename(model_local)
#load that file from local file
self._model=joblib.load(model_local)
new_x = self._pd.DataFrame.from_dict(element,
orient="index").transpose().fillna(0)
=id = self._model.predict(self._textExtraction.transform(new_x.iloc[:, 1]).toarray()).tolist()[0]
return [{'index': element['index'], 'product_title':element['title'], 'title': self._translationDictionary[id]}]
schema = parse_table_schema_from_json(json.dumps({'fields':
[ { 'name': 'index', 'type': 'INTEGER'},
{ 'name': 'product_title', 'type': 'STRING'},
{ 'name': 'title', 'type': 'STRING'}]}))
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.gcp.bigquery.WriteToBigQuery(
'my_table', 'my_dataset', 'my_project', schema = schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
# run the pipeline
result = p.run()
result.wait_until_finish()
and my requirements.txt is
google-cloud-storage==1.19.0
scikit-learn==0.23.1
Any ideas how to fix this?
Upvotes: 0
Views: 2854
Reputation: 223
As @Enrique Zetina noted, solution is present and described in detail here.
However, this raised another error regarding package names and the Dataflow workers. To fix this, import the packages inside the function:
def run():
import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import joblib
import json
...
if __name__ == '__main__':
run()
Also you will want to use argument max_num_workers
instead of num_workers
if the dataset is big.
Upvotes: 1