Reputation: 698
I have a DAG in google composer (airflow) that imports:
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
when I run the DAG I got this error:
"ImportError: No module named sensors.base_sensor_operator"
Basically I want to check if a file exists in a bucket before doing something else.
Here is the complete python code:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import
GoogleCloudStorageObjectSensor
CONNECTION_ID = 'something'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}
def print_hello():
return 'youtube folder exists!!'
with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2',
schedule_interval=timedelta(days=1),
default_args=default_args ) as dag:
gcp_sensorBucket=GoogleCloudStorageObjectSensor(
task_id='gcp_sensorbucket',
bucket='/aa_youtube_new/2018/06/04/',
#bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
object='*.csv',
google_cloud_conn_id=CONNECTION_ID
)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)
Upvotes: 1
Views: 4592
Reputation: 1452
I see that the import from base_sensor_operator was added in the version 1.10 of Airflow. This was not present in versions 1.8 and 1.9. Instead, the import was done as follows:
from airflow.operators.sensors import BaseSensorOperator
So, checking the sensors, the related sensor for Airflow 1.10 is under sensors but for Airflow 1.9 and 1.8, the sensor is under operators.
So, this issue seems to be related to the versioning of Composer and Airflow, but Airflow 1.10 wasn't available for Composer on August. In fact, this issue was reported on Google's Issue Tracker and the response is to solve this by using Apache Airflow version 1.10, which can be included on the version 1.3 of Composer.
Upvotes: 1
Reputation: 691
Which Composer version are you using? I copy and paste your DAG code into a newly created Composer environment composer-1.1.0-airflow-1.9.0, it works out of the box for me.
Upvotes: 0