MT467
MT467

Reputation: 698

getting error of import in airflow DAG in google cloud composer

I have a DAG in google composer (airflow) that imports:

from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor

when I run the DAG I got this error:

"ImportError: No module named sensors.base_sensor_operator"

Basically I want to check if a file exists in a bucket before doing something else.

Here is the complete python code:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import 
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import 
GoogleCloudStorageObjectSensor


CONNECTION_ID = 'something'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}

def print_hello():
return 'youtube folder exists!!'

with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2', 
schedule_interval=timedelta(days=1), 
 default_args=default_args ) as dag: 

 gcp_sensorBucket=GoogleCloudStorageObjectSensor(
 task_id='gcp_sensorbucket',
 bucket='/aa_youtube_new/2018/06/04/', 
 #bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
 object='*.csv', 
 google_cloud_conn_id=CONNECTION_ID

)
hello_operator = PythonOperator(task_id='hello_task', 
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)

Upvotes: 1

Views: 4592

Answers (2)

Héctor Neri
Héctor Neri

Reputation: 1452

I see that the import from base_sensor_operator was added in the version 1.10 of Airflow. This was not present in versions 1.8 and 1.9. Instead, the import was done as follows:

from airflow.operators.sensors import BaseSensorOperator

So, checking the sensors, the related sensor for Airflow 1.10 is under sensors but for Airflow 1.9 and 1.8, the sensor is under operators.

So, this issue seems to be related to the versioning of Composer and Airflow, but Airflow 1.10 wasn't available for Composer on August. In fact, this issue was reported on Google's Issue Tracker and the response is to solve this by using Apache Airflow version 1.10, which can be included on the version 1.3 of Composer.

Upvotes: 1

Feng Lu
Feng Lu

Reputation: 691

Which Composer version are you using? I copy and paste your DAG code into a newly created Composer environment composer-1.1.0-airflow-1.9.0, it works out of the box for me.

Here's the screenshot: enter image description here

Upvotes: 0

Related Questions