swygerts
swygerts

Reputation: 223

Dataflow Worker: Failed to install packages: failed to install requirements: exit status 1

I am trying to mimic this walkthrough here in order to build a pipeline to apply an sklearn model to data I have, but am running into an error. My command line input is below:

(venv) computer:predictions uswygst$ python predictions.py   \
--runner DataflowRunner \
--project my_project    \
--requirements_file "requirements.txt"   \
--temp_location gs://my_bucket/template/   \
--worker_machine_type n1-standard-8  \
--num_workers 5

This command results in the following error:

predictions.py:57: BeamDeprecationWarning: parse_table_schema_from_json is deprecated since 2.11.0. Use bigquery_tools.parse_table_schema_from_json instead.
  { 'name': 'title', 'type': 'STRING'}]}))
/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1479: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/runners/dataflow/ptransform_overrides.py:315: 

BeamDeprecationWarning: BigQuerySink is deprecated since 2.11.0. Use WriteToBigQuery instead.
  kms_key=self.kms_key))
You are using pip version 9.0.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
You are using pip version 9.0.3, however version 20.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.


WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
Traceback (most recent call last):
  File "predictions.py", line 75, in <module>
    result.wait_until_finish()
  File "/opt/anaconda3/envs/venv/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1629, in wait_until_finish
    self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
A setup error was detected in beamapp-uswygst-091501355-09141837-d5ts-harness-qdhg. Please refer to the worker-startup log for detailed information.

When I look at the worker logs, I see this:

Failed to install packages: failed to install requirements: exit status 1

The code for my single python file that I am submitting is

import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import joblib
import json

query = """
    SELECT index, product_title
    FROM `my_project.my_dataset.my_table`
"""

class ApplyDoFn(beam.DoFn):
    def __init__(self):
        self._model = None
        self._textExtraction = None
        self._translationDictionary = None
        self._storage = storage
        self._pkl = pkl
        self._pd = pd
        self._joblib = joblib

    def process(self, element):
        if self._textExtraction is None:
            bucket = self._storage.Client().get_bucket(
                'marketing-analytics-data')
            blob = bucket.get_blob('tfidfit')
            self._textExtraction = pkl.loads(blob.download_as_string(), encoding='latin-1')
        if self._translationDictionary is None:
            bucket = self._storage.Client().get_bucket(
                'marketing-analytics-data')
            blob = bucket.get_blob('id_to_category')
            self._translationDictionary = self._pkl.loads(blob.download_as_string())
        if self._model is None:
            bucket = self._storage.Client().get_bucket(
                'marketing-analytics-data')
            blob = bucket.get_blob('model.joblib')
            model_local='local_model'
            blob.download_to_filename(model_local)
            #load that file from local file
            self._model=joblib.load(model_local)

        new_x = self._pd.DataFrame.from_dict(element,
                                             orient="index").transpose().fillna(0)
        =id = self._model.predict(self._textExtraction.transform(new_x.iloc[:, 1]).toarray()).tolist()[0]

        return [{'index': element['index'], 'product_title':element['title'], 'title': self._translationDictionary[id]}]

schema = parse_table_schema_from_json(json.dumps({'fields':
            [ { 'name': 'index', 'type': 'INTEGER'},
              { 'name': 'product_title', 'type': 'STRING'},
              { 'name': 'title', 'type': 'STRING'}]}))

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)

# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
       beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.gcp.bigquery.WriteToBigQuery(
                'my_table', 'my_dataset', 'my_project', schema = schema,
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

# run the pipeline
result = p.run()
result.wait_until_finish()

and my requirements.txt is

google-cloud-storage==1.19.0
scikit-learn==0.23.1

Any ideas how to fix this?

Upvotes: 0

Views: 2854

Answers (1)

swygerts
swygerts

Reputation: 223

As @Enrique Zetina noted, solution is present and described in detail here.

However, this raised another error regarding package names and the Dataflow workers. To fix this, import the packages inside the function:

   def run():
       import apache_beam as beam
       import argparse
       from google.cloud import storage
       from apache_beam.options.pipeline_options import PipelineOptions
       from apache_beam.options.pipeline_options import SetupOptions
       from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
       import pandas as pd
       import pickle as pkl
       import joblib
       import json
    ... 

    if __name__ == '__main__':
    run()

Also you will want to use argument max_num_workers instead of num_workers if the dataset is big.

Upvotes: 1

Related Questions