Reputation: 230
Very new to Airflow/Python etc. but can't seem to work out what I need to do to resolve this issue..
Airflow is running on Puckel/Docker
Full error is:
Broken DAG : [/usr/local/airflow/dags/xxxx.py] No module named 'airflow.contrib.operators.gsc_to_gcs'
In the python code, I've written:
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
I'm guessing I need to install the gcs_to_gcs
module, but I'm not sure how to do this.
Any specific instructions would be greatly appreciated :-)
Upvotes: 6
Views: 11215
Reputation: 1656
I know this is an old question but I just tried to use this same operator and received the same message since Cloud-Composer is still not supporting GoogleCloudStorageToGoogleCloudStorageOperator
.
I managed to achieve what I needed with a workaround using a simple BashOperator
from airflow.operators.bash_operator import BashOperator
with models.DAG(
dag_name,
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
copy_files = BashOperator(
task_id='copy_files',
bash_command='gsutil -m cp <Source Bucket> <Destination Bucket>'
)
Is very straightforward, can create folders if you need and rename your files.
Upvotes: 1
Reputation: 18844
The GoogleCloudStorageToGoogleCloudStorageOperator
wasn't available in v1.9.0 so you will have to copy the file from here and the related hook from here and paste it in the Airflow folder in your python environment at the respective location. Follow the steps below:
Run the following code to find where Apache Airflow is stored on your machine:
pip show apache-airflow
which should produce the following output on your terminal:
Name: apache-airflow
Version: 2.0.0.dev0+incubating
Summary: Programmatically author, schedule and monitor data pipelines
Home-page: http://airflow.incubator.apache.org/
Author: Apache Software Foundation
Author-email: [email protected]
License: Apache License 2.0
Location: /Users/kaxil/anaconda2/lib/python2.7/site-packages
Requires: iso8601, bleach, gunicorn, sqlalchemy-utc, markdown, flask-caching, alembic, croniter, flask-wtf, requests, tabulate, psutil, jinja2, gitpython, python-nvd3, sqlalchemy, dill, flask, pandas, pendulum, flask-login, funcsigs, flask-swagger, flask-admin, lxml, python-dateutil, pygments, werkzeug, tzlocal, python-daemon, setproctitle, zope.deprecation, flask-appbuilder, future, configparser, thrift
Required-by:
The path after Location: is your Apache Airflow directory
Now clone the git repo to get those two files:
# Clone the git repo to `airflow-temp` folder
git clone https://github.com/apache/incubator-airflow airflow-temp
# Copy the hook from the cloned repo to where Apache Airflow is located
# Replace LINK_TO_SITE_PACKAGES_DIR with the path you found above
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/hooks/
# For example: for me, it would be
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/hooks/
# Do the same with operator file
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/operators/
# For example: for me, it would be
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/operators/
Re-run the Airflow webserver
and scheduler
and this should now work.
Upvotes: 7